You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2015/04/03 00:00:50 UTC
[5/7] ambari git commit: AMBARI-10290. Expose avaialble host metrics
across hostcomponents. (swagle)
http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
new file mode 100644
index 0000000..a0e4e32
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_DISABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_DISABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
+
+public class TimelineMetricAggregatorFactory {
+ private static final String MINUTE_AGGREGATE_CHECKPOINT_FILE =
+ "timeline-metrics-host-aggregator-checkpoint";
+ private static final String MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE =
+ "timeline-metrics-host-aggregator-hourly-checkpoint";
+
+ public static TimelineMetricAggregator createTimelineMetricAggregatorMinute
+ (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+
+ String checkpointDir = metricsConf.get(
+ TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+ String checkpointLocation = FilenameUtils.concat(checkpointDir,
+ MINUTE_AGGREGATE_CHECKPOINT_FILE);
+ long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+ (HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l)); // 5 mins
+
+ int checkpointCutOffMultiplier = metricsConf.getInt
+ (HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 3);
+ String hostAggregatorDisabledParam = HOST_AGGREGATOR_MINUTE_DISABLED;
+
+ String inputTableName = METRICS_RECORD_TABLE_NAME;
+ String outputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+
+ return new TimelineMetricAggregator(hBaseAccessor, metricsConf,
+ checkpointLocation,
+ sleepIntervalMillis,
+ checkpointCutOffMultiplier,
+ hostAggregatorDisabledParam,
+ inputTableName,
+ outputTableName,
+ 120000l);
+ }
+
+ public static TimelineMetricAggregator createTimelineMetricAggregatorHourly
+ (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+
+ String checkpointDir = metricsConf.get(
+ TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+ String checkpointLocation = FilenameUtils.concat(checkpointDir,
+ MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE);
+ long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+ (HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l));
+
+ int checkpointCutOffMultiplier = metricsConf.getInt
+ (HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+ String hostAggregatorDisabledParam = HOST_AGGREGATOR_HOUR_DISABLED;
+
+ String inputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+ String outputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+
+ return new TimelineMetricAggregator(hBaseAccessor, metricsConf,
+ checkpointLocation,
+ sleepIntervalMillis,
+ checkpointCutOffMultiplier,
+ hostAggregatorDisabledParam,
+ inputTableName,
+ outputTableName,
+ 3600000l);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
new file mode 100644
index 0000000..0c8ded2
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_APP_ID;
+
+/**
+ * Aggregator responsible for providing app level host aggregates. This task
+ * is accomplished without doing a round trip to storage, rather
+ * TimelineMetricClusterAggregators are responsible for lifecycle of
+ * @TimelineMetricAppAggregator and provide the raw data to aggregate.
+ */
+public class TimelineMetricAppAggregator {
+ private static final Log LOG = LogFactory.getLog(TimelineMetricAppAggregator.class);
+ // Lookup to check candidacy of an app
+ private final List<String> appIdsToAggregate;
+ // Map to lookup apps on a host
+ private Map<String, List<String>> hostedAppsMap = new HashMap<String, List<String>>();
+
+ Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics;
+
+ public TimelineMetricAppAggregator(Configuration metricsConf) {
+ appIdsToAggregate = getAppIdsForHostAggregation(metricsConf);
+ LOG.info("AppIds configured for aggregation: " + appIdsToAggregate);
+ }
+
+ /**
+ * Lifecycle method to initialize aggregation cycle.
+ */
+ public void init() {
+ LOG.debug("Initializing aggregation cycle.");
+ aggregateClusterMetrics = new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+ }
+
+ /**
+ * Lifecycle method to indicate end of aggregation cycle.
+ */
+ public void cleanup() {
+ LOG.debug("Cleanup aggregated data.");
+ aggregateClusterMetrics = null;
+ }
+
+ /**
+ * Useful for resetting apps that no-longer need aggregation without restart.
+ */
+ public void destroy() {
+ LOG.debug("Cleanup aggregated data as well as in-memory state.");
+ aggregateClusterMetrics = null;
+ hostedAppsMap = new HashMap<String, List<String>>();
+ }
+
+ /**
+ * Calculate aggregates if the clusterMetric is a Host metric for recorded
+ * apps that are housed by this host.
+ *
+ * @param clusterMetric @TimelineClusterMetric Host / App metric
+ * @param hostname This is the hostname from which this clusterMetric originated.
+ * @param metricValue The metric value for this metric.
+ */
+ public void processTimelineClusterMetric(TimelineClusterMetric clusterMetric,
+ String hostname, Double metricValue) {
+
+ String appId = clusterMetric.getAppId();
+ if (appId == null) {
+ return; // No real use case except tests
+ }
+
+ // If metric is a host metric and host has apps on it
+ if (appId.equalsIgnoreCase(HOST_APP_ID)) {
+ // Candidate metric, update app aggregates
+ if (hostedAppsMap.containsKey(hostname)) {
+ updateAppAggregatesFromHostMetric(clusterMetric, hostname, metricValue);
+ }
+ } else {
+ // Build the hostedapps map if not a host metric
+ // Check app candidacy for host aggregation
+ if (appIdsToAggregate.contains(appId)) {
+ List<String> appIds = hostedAppsMap.get(hostname);
+ if (appIds == null) {
+ appIds = new ArrayList<String>();
+ hostedAppsMap.put(hostname, appIds);
+ }
+ if (!appIds.contains(appId)) {
+ appIds.add(appId);
+ LOG.info("Adding appId to hosted apps: appId = " +
+ clusterMetric.getAppId() + ", hostname = " + hostname);
+ }
+ }
+ }
+ }
+
+ /**
+ * Build a cluster app metric from a host metric
+ */
+ private void updateAppAggregatesFromHostMetric(TimelineClusterMetric clusterMetric,
+ String hostname, Double metricValue) {
+
+ if (aggregateClusterMetrics == null) {
+ LOG.error("Aggregation requested without init call.");
+ return;
+ }
+
+ List<String> apps = hostedAppsMap.get(hostname);
+ for (String appId : apps) {
+ // Add a new cluster aggregate metric if none exists
+ TimelineClusterMetric appTimelineClusterMetric =
+ new TimelineClusterMetric(clusterMetric.getMetricName(),
+ appId,
+ clusterMetric.getInstanceId(),
+ clusterMetric.getTimestamp(),
+ clusterMetric.getType()
+ );
+
+ MetricClusterAggregate clusterAggregate = aggregateClusterMetrics.get(appTimelineClusterMetric);
+
+ if (clusterAggregate == null) {
+ clusterAggregate = new MetricClusterAggregate(metricValue, 1, null, metricValue, metricValue);
+ aggregateClusterMetrics.put(appTimelineClusterMetric, clusterAggregate);
+ } else {
+ clusterAggregate.updateSum(metricValue);
+ clusterAggregate.updateNumberOfHosts(1);
+ clusterAggregate.updateMax(metricValue);
+ clusterAggregate.updateMin(metricValue);
+ }
+
+ }
+ }
+
+ /**
+ * Return current copy of aggregated data.
+ */
+ public Map<TimelineClusterMetric, MetricClusterAggregate> getAggregateClusterMetrics() {
+ return aggregateClusterMetrics;
+ }
+
+ private List<String> getAppIdsForHostAggregation(Configuration metricsConf) {
+ String appIds = metricsConf.get(CLUSTER_AGGREGATOR_APP_IDS);
+ if (!StringUtils.isEmpty(appIds)) {
+ return Arrays.asList(StringUtils.stripAll(appIds.split(",")));
+ }
+ return Collections.emptyList();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
new file mode 100644
index 0000000..68b2ba9
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_DISABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
+
+/**
+ * Aggregates a metric across all hosts in the cluster. Reads metrics from
+ * the precision table and saves into the aggregate.
+ */
+public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator {
+ private static final Log LOG = LogFactory.getLog(TimelineMetricClusterAggregator.class);
+ private static final String CLUSTER_AGGREGATOR_CHECKPOINT_FILE =
+ "timeline-metrics-cluster-aggregator-checkpoint";
+ private final String checkpointLocation;
+ private final Long sleepIntervalMillis;
+ public final int timeSliceIntervalMillis;
+ private final Integer checkpointCutOffMultiplier;
+ private TimelineMetricReadHelper timelineMetricReadHelper = new TimelineMetricReadHelper(true);
+ // Aggregator to perform app-level aggregates for host metrics
+ private final TimelineMetricAppAggregator appAggregator;
+
+ public TimelineMetricClusterAggregator(PhoenixHBaseAccessor hBaseAccessor,
+ Configuration metricsConf) {
+ super(hBaseAccessor, metricsConf);
+
+ String checkpointDir = metricsConf.get(
+ TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+
+ checkpointLocation = FilenameUtils.concat(checkpointDir,
+ CLUSTER_AGGREGATOR_CHECKPOINT_FILE);
+
+ sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+ (CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 120l));
+ timeSliceIntervalMillis = (int)SECONDS.toMillis(metricsConf.getInt
+ (CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 15));
+ checkpointCutOffMultiplier =
+ metricsConf.getInt(CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+
+ appAggregator = new TimelineMetricAppAggregator(metricsConf);
+ }
+
+ @Override
+ protected String getCheckpointLocation() {
+ return checkpointLocation;
+ }
+
+ @Override
+ protected void aggregate(ResultSet rs, long startTime, long endTime)
+ throws SQLException, IOException {
+ List<Long[]> timeSlices = getTimeSlices(startTime, endTime);
+ // Initialize app aggregates for host metrics
+ appAggregator.init();
+ Map<TimelineClusterMetric, MetricClusterAggregate>
+ aggregateClusterMetrics = aggregateMetricsFromResultSet(rs, timeSlices);
+
+ LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates.");
+ hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics);
+ appAggregator.cleanup();
+ }
+
+ @Override
+ protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
+ Condition condition = new DefaultCondition(null, null, null, null, startTime,
+ endTime, null, null, true);
+ condition.setNoLimit();
+ condition.setFetchSize(resultsetFetchSize);
+ condition.setStatement(String.format(GET_METRIC_SQL,
+ PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
+ METRICS_RECORD_TABLE_NAME));
+ condition.addOrderByColumn("METRIC_NAME");
+ condition.addOrderByColumn("APP_ID");
+ condition.addOrderByColumn("INSTANCE_ID");
+ condition.addOrderByColumn("SERVER_TIME");
+ return condition;
+ }
+
+ private List<Long[]> getTimeSlices(long startTime, long endTime) {
+ List<Long[]> timeSlices = new ArrayList<Long[]>();
+ long sliceStartTime = startTime;
+ while (sliceStartTime < endTime) {
+ timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis});
+ sliceStartTime += timeSliceIntervalMillis;
+ }
+ return timeSlices;
+ }
+
+ private Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices)
+ throws SQLException, IOException {
+ Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
+ new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+ // Create time slices
+
+ while (rs.next()) {
+ TimelineMetric metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
+
+ Map<TimelineClusterMetric, Double> clusterMetrics =
+ sliceFromTimelineMetric(metric, timeSlices);
+
+ if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
+ for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry :
+ clusterMetrics.entrySet()) {
+
+ TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
+ Double avgValue = clusterMetricEntry.getValue();
+
+ MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric);
+
+ if (aggregate == null) {
+ aggregate = new MetricClusterAggregate(avgValue, 1, null, avgValue, avgValue);
+ aggregateClusterMetrics.put(clusterMetric, aggregate);
+ } else {
+ aggregate.updateSum(avgValue);
+ aggregate.updateNumberOfHosts(1);
+ aggregate.updateMax(avgValue);
+ aggregate.updateMin(avgValue);
+ }
+ // Update app level aggregates
+ appAggregator.processTimelineClusterMetric(clusterMetric,
+ metric.getHostName(), avgValue);
+ }
+ }
+ }
+ // Add app level aggregates to save
+ aggregateClusterMetrics.putAll(appAggregator.getAggregateClusterMetrics());
+ return aggregateClusterMetrics;
+ }
+
+ @Override
+ protected Long getSleepIntervalMillis() {
+ return sleepIntervalMillis;
+ }
+
+ @Override
+ protected Integer getCheckpointCutOffMultiplier() {
+ return checkpointCutOffMultiplier;
+ }
+
+ @Override
+ public boolean isDisabled() {
+ return metricsConf.getBoolean(CLUSTER_AGGREGATOR_MINUTE_DISABLED, false);
+ }
+
+ private Map<TimelineClusterMetric, Double> sliceFromTimelineMetric(
+ TimelineMetric timelineMetric, List<Long[]> timeSlices) {
+
+ if (timelineMetric.getMetricValues().isEmpty()) {
+ return null;
+ }
+
+ Map<TimelineClusterMetric, Double> timelineClusterMetricMap =
+ new HashMap<TimelineClusterMetric, Double>();
+
+ for (Map.Entry<Long, Double> metric : timelineMetric.getMetricValues().entrySet()) {
+ // TODO: investigate null values - pre filter
+ if (metric.getValue() == null) {
+ continue;
+ }
+ Long timestamp = getSliceTimeForMetric(timeSlices,
+ Long.parseLong(metric.getKey().toString()));
+ if (timestamp != -1) {
+ // Metric is within desired time range
+ TimelineClusterMetric clusterMetric = new TimelineClusterMetric(
+ timelineMetric.getMetricName(),
+ timelineMetric.getAppId(),
+ timelineMetric.getInstanceId(),
+ timestamp,
+ timelineMetric.getType());
+ if (!timelineClusterMetricMap.containsKey(clusterMetric)) {
+ timelineClusterMetricMap.put(clusterMetric, metric.getValue());
+ } else {
+ Double oldValue = timelineClusterMetricMap.get(clusterMetric);
+ Double newValue = (oldValue + metric.getValue()) / 2;
+ timelineClusterMetricMap.put(clusterMetric, newValue);
+ }
+ }
+ }
+
+ return timelineClusterMetricMap;
+ }
+
+ /**
+ * Return beginning of the time slice into which the metric fits.
+ */
+ private Long getSliceTimeForMetric(List<Long[]> timeSlices, Long timestamp) {
+ for (Long[] timeSlice : timeSlices) {
+ if (timestamp >= timeSlice[0] && timestamp < timeSlice[1]) {
+ return timeSlice[0];
+ }
+ }
+ return -1l;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorHourly.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorHourly.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorHourly.java
new file mode 100644
index 0000000..264e4e6
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorHourly.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_DISABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
+
+public class TimelineMetricClusterAggregatorHourly extends AbstractTimelineAggregator {
+ private static final Log LOG = LogFactory.getLog
+ (TimelineMetricClusterAggregatorHourly.class);
+ private static final String CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE =
+ "timeline-metrics-cluster-aggregator-hourly-checkpoint";
+ private final String checkpointLocation;
+ private final long sleepIntervalMillis;
+ private final Integer checkpointCutOffMultiplier;
+ private long checkpointCutOffIntervalMillis;
+ private static final Long NATIVE_TIME_RANGE_DELTA = 3600000l; // 1 hour
+ private final TimelineClusterMetricReader timelineClusterMetricReader
+ = new TimelineClusterMetricReader(true);
+
+ public TimelineMetricClusterAggregatorHourly(
+ PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+ super(hBaseAccessor, metricsConf);
+
+ String checkpointDir = metricsConf.get(
+ TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+
+ checkpointLocation = FilenameUtils.concat(checkpointDir,
+ CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE);
+
+ sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+ (CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l));
+ checkpointCutOffIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+ (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL, 7200l));
+ checkpointCutOffMultiplier = metricsConf.getInt
+ (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+ }
+
+ @Override
+ protected String getCheckpointLocation() {
+ return checkpointLocation;
+ }
+
+ @Override
+ protected void aggregate(ResultSet rs, long startTime, long endTime)
+ throws SQLException, IOException {
+ Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap =
+ aggregateMetricsFromResultSet(rs);
+
+ LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
+ hBaseAccessor.saveClusterAggregateHourlyRecords(hostAggregateMap,
+ METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME);
+ }
+
+ @Override
+ protected Condition prepareMetricQueryCondition(long startTime,
+ long endTime) {
+ Condition condition = new DefaultCondition(null, null, null, null, startTime,
+ endTime, null, null, true);
+ condition.setNoLimit();
+ condition.setFetchSize(resultsetFetchSize);
+ condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
+ PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
+ METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
+ condition.addOrderByColumn("METRIC_NAME");
+ condition.addOrderByColumn("APP_ID");
+ condition.addOrderByColumn("INSTANCE_ID");
+ condition.addOrderByColumn("SERVER_TIME");
+ return condition;
+ }
+
+ private Map<TimelineClusterMetric, MetricHostAggregate> aggregateMetricsFromResultSet(ResultSet rs)
+ throws IOException, SQLException {
+
+ TimelineClusterMetric existingMetric = null;
+ MetricHostAggregate hostAggregate = null;
+ Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap =
+ new HashMap<TimelineClusterMetric, MetricHostAggregate>();
+
+ while (rs.next()) {
+ TimelineClusterMetric currentMetric =
+ timelineClusterMetricReader.fromResultSet(rs);
+ MetricClusterAggregate currentHostAggregate =
+ getMetricClusterAggregateFromResultSet(rs);
+
+ if (existingMetric == null) {
+ // First row
+ existingMetric = currentMetric;
+ hostAggregate = new MetricHostAggregate();
+ hostAggregateMap.put(currentMetric, hostAggregate);
+ }
+
+ if (existingMetric.equalsExceptTime(currentMetric)) {
+ // Recalculate totals with current metric
+ updateAggregatesFromHost(hostAggregate, currentHostAggregate);
+
+ } else {
+ // Switched over to a new metric - save existing
+ hostAggregate = new MetricHostAggregate();
+ updateAggregatesFromHost(hostAggregate, currentHostAggregate);
+ hostAggregateMap.put(currentMetric, hostAggregate);
+ existingMetric = currentMetric;
+ }
+
+ }
+
+ return hostAggregateMap;
+ }
+
+ private void updateAggregatesFromHost(MetricHostAggregate agg, MetricClusterAggregate currentClusterAggregate) {
+ agg.updateMax(currentClusterAggregate.getMax());
+ agg.updateMin(currentClusterAggregate.getMin());
+ agg.updateSum(currentClusterAggregate.getSum());
+ agg.updateNumberOfSamples(currentClusterAggregate.getNumberOfHosts());
+ }
+
+ @Override
+ protected Long getSleepIntervalMillis() {
+ return sleepIntervalMillis;
+ }
+
+ @Override
+ protected Integer getCheckpointCutOffMultiplier() {
+ return checkpointCutOffMultiplier;
+ }
+
+ @Override
+ protected Long getCheckpointCutOffIntervalMillis() {
+ return checkpointCutOffIntervalMillis;
+ }
+
+ @Override
+ public boolean isDisabled() {
+ return metricsConf.getBoolean(CLUSTER_AGGREGATOR_HOUR_DISABLED, false);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
new file mode 100644
index 0000000..40a9648
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+
+
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class TimelineMetricReadHelper {
+
+ private boolean ignoreInstance = false;
+
+ public TimelineMetricReadHelper() {}
+
+ public TimelineMetricReadHelper(boolean ignoreInstance) {
+ this.ignoreInstance = ignoreInstance;
+ }
+
+ public TimelineMetric getTimelineMetricFromResultSet(ResultSet rs)
+ throws SQLException, IOException {
+ TimelineMetric metric = getTimelineMetricCommonsFromResultSet(rs);
+ Map<Long, Double> sortedByTimeMetrics = new TreeMap<Long, Double>(
+ PhoenixHBaseAccessor.readMetricFromJSON(rs.getString("METRICS")));
+ metric.setMetricValues(sortedByTimeMetrics);
+ return metric;
+ }
+
+ /**
+ * Returns common part of timeline metrics record without the values.
+ */
+ public TimelineMetric getTimelineMetricCommonsFromResultSet(ResultSet rs)
+ throws SQLException {
+ TimelineMetric metric = new TimelineMetric();
+ metric.setMetricName(rs.getString("METRIC_NAME"));
+ metric.setAppId(rs.getString("APP_ID"));
+ if (!ignoreInstance) metric.setInstanceId(rs.getString("INSTANCE_ID"));
+ metric.setHostName(rs.getString("HOSTNAME"));
+ metric.setTimestamp(rs.getLong("SERVER_TIME"));
+ metric.setStartTime(rs.getLong("START_TIME"));
+ metric.setType(rs.getString("UNITS"));
+ return metric;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/Condition.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/Condition.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/Condition.java
new file mode 100644
index 0000000..b52748f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/Condition.java
@@ -0,0 +1,46 @@
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
+
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Precision;
+
+import java.util.List;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface Condition {
+ boolean isEmpty();
+
+ List<String> getMetricNames();
+ boolean isPointInTime();
+ boolean isGrouped();
+ void setStatement(String statement);
+ String getHostname();
+ Precision getPrecision();
+ void setPrecision(Precision precision);
+ String getAppId();
+ String getInstanceId();
+ StringBuilder getConditionClause();
+ String getOrderByClause(boolean asc);
+ String getStatement();
+ Long getStartTime();
+ Long getEndTime();
+ Integer getLimit();
+ Integer getFetchSize();
+ void setFetchSize(Integer fetchSize);
+ void addOrderByColumn(String column);
+ void setNoLimit();
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java
new file mode 100644
index 0000000..24239a0
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
+
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+/**
+ *
+ */
+public interface ConnectionProvider {
+ public Connection getConnection() throws SQLException;
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java
new file mode 100644
index 0000000..9d6b7df
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Precision;
+
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+public class DefaultCondition implements Condition {
+ List<String> metricNames;
+ String hostname;
+ String appId;
+ String instanceId;
+ Long startTime;
+ Long endTime;
+ Precision precision;
+ Integer limit;
+ boolean grouped;
+ boolean noLimit = false;
+ Integer fetchSize;
+ String statement;
+ Set<String> orderByColumns = new LinkedHashSet<String>();
+
+ public DefaultCondition(List<String> metricNames, String hostname, String appId,
+ String instanceId, Long startTime, Long endTime, Precision precision,
+ Integer limit, boolean grouped) {
+ this.metricNames = metricNames;
+ this.hostname = hostname;
+ this.appId = appId;
+ this.instanceId = instanceId;
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.precision = precision;
+ this.limit = limit;
+ this.grouped = grouped;
+ }
+
+ public String getStatement() {
+ return statement;
+ }
+
+ public void setStatement(String statement) {
+ this.statement = statement;
+ }
+
+ public List<String> getMetricNames() {
+ return metricNames == null || metricNames.isEmpty() ? null : metricNames;
+ }
+
+ public StringBuilder getConditionClause() {
+ StringBuilder sb = new StringBuilder();
+ boolean appendConjunction = false;
+ StringBuilder metricsLike = new StringBuilder();
+ StringBuilder metricsIn = new StringBuilder();
+
+ if (getMetricNames() != null) {
+ for (String name : getMetricNames()) {
+ if (name.contains("%")) {
+ if (metricsLike.length() > 1) {
+ metricsLike.append(" OR ");
+ }
+ metricsLike.append("METRIC_NAME LIKE ?");
+ } else {
+ if (metricsIn.length() > 0) {
+ metricsIn.append(", ");
+ }
+ metricsIn.append("?");
+ }
+ }
+
+ if (metricsIn.length()>0) {
+ sb.append("(METRIC_NAME IN (");
+ sb.append(metricsIn);
+ sb.append(")");
+ appendConjunction = true;
+ }
+
+ if (metricsLike.length() > 0) {
+ if (appendConjunction) {
+ sb.append(" OR ");
+ } else {
+ sb.append("(");
+ }
+ sb.append(metricsLike);
+ appendConjunction = true;
+ }
+
+ if (appendConjunction) {
+ sb.append(")");
+ }
+ }
+
+ appendConjunction = append(sb, appendConjunction, getHostname(), " HOSTNAME = ?");
+ appendConjunction = append(sb, appendConjunction, getAppId(), " APP_ID = ?");
+ appendConjunction = append(sb, appendConjunction, getInstanceId(), " INSTANCE_ID = ?");
+ appendConjunction = append(sb, appendConjunction, getStartTime(), " SERVER_TIME >= ?");
+ append(sb, appendConjunction, getEndTime(), " SERVER_TIME < ?");
+
+ return sb;
+ }
+
+ protected static boolean append(StringBuilder sb,
+ boolean appendConjunction,
+ Object value, String str) {
+ if (value != null) {
+ if (appendConjunction) {
+ sb.append(" AND");
+ }
+
+ sb.append(str);
+ appendConjunction = true;
+ }
+ return appendConjunction;
+ }
+
+ public String getHostname() {
+ return hostname == null || hostname.isEmpty() ? null : hostname;
+ }
+
+ public Precision getPrecision() {
+ return precision;
+ }
+
+ public void setPrecision(Precision precision) {
+ this.precision = precision;
+ }
+
+ public String getAppId() {
+ if (appId != null && !appId.isEmpty()) {
+ if (!(appId.equals("HOST") || appId.equals("FLUME_HANDLER")) ) {
+ return appId.toLowerCase();
+ } else {
+ return appId;
+ }
+ }
+ return null;
+ }
+
+ public String getInstanceId() {
+ return instanceId == null || instanceId.isEmpty() ? null : instanceId;
+ }
+
+ /**
+ * Convert to millis.
+ */
+ public Long getStartTime() {
+ if (startTime == null) {
+ return null;
+ } else if (startTime < 9999999999l) {
+ return startTime * 1000;
+ } else {
+ return startTime;
+ }
+ }
+
+ public Long getEndTime() {
+ if (endTime == null) {
+ return null;
+ }
+ if (endTime < 9999999999l) {
+ return endTime * 1000;
+ } else {
+ return endTime;
+ }
+ }
+
+ public void setNoLimit() {
+ this.noLimit = true;
+ }
+
+ public Integer getLimit() {
+ if (noLimit) {
+ return null;
+ }
+ return limit == null ? PhoenixHBaseAccessor.RESULTSET_LIMIT : limit;
+ }
+
+ public boolean isGrouped() {
+ return grouped;
+ }
+
+ public boolean isPointInTime() {
+ return getStartTime() == null && getEndTime() == null;
+ }
+
+ public boolean isEmpty() {
+ return (metricNames == null || metricNames.isEmpty())
+ && (hostname == null || hostname.isEmpty())
+ && (appId == null || appId.isEmpty())
+ && (instanceId == null || instanceId.isEmpty())
+ && startTime == null
+ && endTime == null;
+ }
+
+ public Integer getFetchSize() {
+ return fetchSize;
+ }
+
+ public void setFetchSize(Integer fetchSize) {
+ this.fetchSize = fetchSize;
+ }
+
+ public void addOrderByColumn(String column) {
+ orderByColumns.add(column);
+ }
+
+ public String getOrderByClause(boolean asc) {
+ String orderByStr = " ORDER BY ";
+ if (!orderByColumns.isEmpty()) {
+ StringBuilder sb = new StringBuilder(orderByStr);
+ for (String orderByColumn : orderByColumns) {
+ if (sb.length() != orderByStr.length()) {
+ sb.append(", ");
+ }
+ sb.append(orderByColumn);
+ if (!asc) {
+ sb.append(" DESC");
+ }
+ }
+ sb.append(" ");
+ return sb.toString();
+ }
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return "Condition{" +
+ "metricNames=" + metricNames +
+ ", hostname='" + hostname + '\'' +
+ ", appId='" + appId + '\'' +
+ ", instanceId='" + instanceId + '\'' +
+ ", startTime=" + startTime +
+ ", endTime=" + endTime +
+ ", limit=" + limit +
+ ", grouped=" + grouped +
+ ", orderBy=" + orderByColumns +
+ ", noLimit=" + noLimit +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java
new file mode 100644
index 0000000..562049b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+public class DefaultPhoenixDataSource implements ConnectionProvider {
+
+ static final Log LOG = LogFactory.getLog(DefaultPhoenixDataSource.class);
+ private static final String ZOOKEEPER_CLIENT_PORT =
+ "hbase.zookeeper.property.clientPort";
+ private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
+ private static final String ZNODE_PARENT = "zookeeper.znode.parent";
+
+ private static final String connectionUrl = "jdbc:phoenix:%s:%s:%s";
+ private final String url;
+
+ public DefaultPhoenixDataSource(Configuration hbaseConf) {
+ String zookeeperClientPort = hbaseConf.getTrimmed(ZOOKEEPER_CLIENT_PORT,
+ "2181");
+ String zookeeperQuorum = hbaseConf.getTrimmed(ZOOKEEPER_QUORUM);
+ String znodeParent = hbaseConf.getTrimmed(ZNODE_PARENT, "/hbase");
+ if (zookeeperQuorum == null || zookeeperQuorum.isEmpty()) {
+ throw new IllegalStateException("Unable to find Zookeeper quorum to " +
+ "access HBase store using Phoenix.");
+ }
+
+ url = String.format(connectionUrl,
+ zookeeperQuorum,
+ zookeeperClientPort,
+ znodeParent);
+ }
+
+ /**
+ * Get JDBC connection to HBase store. Assumption is that the hbase
+ * configuration is present on the classpath and loaded by the caller into
+ * the Configuration object.
+ * Phoenix already caches the HConnection between the client and HBase
+ * cluster.
+ *
+ * @return @java.sql.Connection
+ */
+ public Connection getConnection() throws SQLException {
+
+ LOG.debug("Metric store connection url: " + url);
+ try {
+ return DriverManager.getConnection(url);
+ } catch (SQLException e) {
+ LOG.warn("Unable to connect to HBase store using Phoenix.", e);
+
+ throw e;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
new file mode 100644
index 0000000..636999f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
@@ -0,0 +1,573 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Precision;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Encapsulate all metrics related SQL queries.
+ */
+public class PhoenixTransactSQL {
+
+ public static final Log LOG = LogFactory.getLog(PhoenixTransactSQL.class);
+
+ /**
+ * Create table to store individual metric records.
+ */
+ public static final String CREATE_METRICS_TABLE_SQL = "CREATE TABLE IF NOT " +
+ "EXISTS METRIC_RECORD (METRIC_NAME VARCHAR, " +
+ "HOSTNAME VARCHAR, " +
+ "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+ "APP_ID VARCHAR, " +
+ "INSTANCE_ID VARCHAR, " +
+ "START_TIME UNSIGNED_LONG, " +
+ "UNITS CHAR(20), " +
+ "METRIC_SUM DOUBLE, " +
+ "METRIC_COUNT UNSIGNED_INT, " +
+ "METRIC_MAX DOUBLE, " +
+ "METRIC_MIN DOUBLE, " +
+ "METRICS VARCHAR CONSTRAINT pk " +
+ "PRIMARY KEY (METRIC_NAME, HOSTNAME, SERVER_TIME, APP_ID, " +
+ "INSTANCE_ID)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
+ "TTL=%s, COMPRESSION='%s'";
+
+ public static final String CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL =
+ "CREATE TABLE IF NOT EXISTS METRIC_RECORD_HOURLY " +
+ "(METRIC_NAME VARCHAR, " +
+ "HOSTNAME VARCHAR, " +
+ "APP_ID VARCHAR, " +
+ "INSTANCE_ID VARCHAR, " +
+ "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+ "UNITS CHAR(20), " +
+ "METRIC_SUM DOUBLE," +
+ "METRIC_COUNT UNSIGNED_INT, " +
+ "METRIC_MAX DOUBLE," +
+ "METRIC_MIN DOUBLE CONSTRAINT pk " +
+ "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
+ "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
+ "TTL=%s, COMPRESSION='%s'";
+
+ public static final String CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL =
+ "CREATE TABLE IF NOT EXISTS METRIC_RECORD_MINUTE " +
+ "(METRIC_NAME VARCHAR, " +
+ "HOSTNAME VARCHAR, " +
+ "APP_ID VARCHAR, " +
+ "INSTANCE_ID VARCHAR, " +
+ "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+ "UNITS CHAR(20), " +
+ "METRIC_SUM DOUBLE," +
+ "METRIC_COUNT UNSIGNED_INT, " +
+ "METRIC_MAX DOUBLE," +
+ "METRIC_MIN DOUBLE CONSTRAINT pk " +
+ "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
+ "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, TTL=%s," +
+ " COMPRESSION='%s'";
+
+ public static final String CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL =
+ "CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE " +
+ "(METRIC_NAME VARCHAR, " +
+ "APP_ID VARCHAR, " +
+ "INSTANCE_ID VARCHAR, " +
+ "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+ "UNITS CHAR(20), " +
+ "METRIC_SUM DOUBLE, " +
+ "HOSTS_COUNT UNSIGNED_INT, " +
+ "METRIC_MAX DOUBLE, " +
+ "METRIC_MIN DOUBLE " +
+ "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " +
+ "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
+ "TTL=%s, COMPRESSION='%s'";
+
+ public static final String CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL =
+ "CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE_HOURLY " +
+ "(METRIC_NAME VARCHAR, " +
+ "APP_ID VARCHAR, " +
+ "INSTANCE_ID VARCHAR, " +
+ "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+ "UNITS CHAR(20), " +
+ "METRIC_SUM DOUBLE, " +
+ "METRIC_COUNT UNSIGNED_INT, " +
+ "METRIC_MAX DOUBLE, " +
+ "METRIC_MIN DOUBLE " +
+ "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " +
+ "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
+ "TTL=%s, COMPRESSION='%s'";
+
+ /**
+ * ALTER table to set new options
+ */
+ public static final String ALTER_SQL = "ALTER TABLE %s SET TTL=%s";
+
+ /**
+ * Insert into metric records table.
+ */
+ public static final String UPSERT_METRICS_SQL = "UPSERT INTO %s " +
+ "(METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, START_TIME, " +
+ "UNITS, " +
+ "METRIC_SUM, " +
+ "METRIC_MAX, " +
+ "METRIC_MIN, " +
+ "METRIC_COUNT, " +
+ "METRICS) VALUES " +
+ "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+ public static final String UPSERT_CLUSTER_AGGREGATE_SQL = "UPSERT INTO " +
+ "METRIC_AGGREGATE (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, " +
+ "UNITS, " +
+ "METRIC_SUM, " +
+ "HOSTS_COUNT, " +
+ "METRIC_MAX, " +
+ "METRIC_MIN) " +
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+ public static final String UPSERT_CLUSTER_AGGREGATE_TIME_SQL = "UPSERT INTO" +
+ " %s (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, " +
+ "UNITS, " +
+ "METRIC_SUM, " +
+ "METRIC_COUNT, " +
+ "METRIC_MAX, " +
+ "METRIC_MIN) " +
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+
+ public static final String UPSERT_AGGREGATE_RECORD_SQL = "UPSERT INTO " +
+ "%s (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
+ "SERVER_TIME, " +
+ "UNITS, " +
+ "METRIC_SUM, " +
+ "METRIC_MAX, " +
+ "METRIC_MIN," +
+ "METRIC_COUNT) " +
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+ /**
+ * Retrieve a set of rows from metrics records table.
+ */
+ public static final String GET_METRIC_SQL = "SELECT %s METRIC_NAME, " +
+ "HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, START_TIME, UNITS, " +
+ "METRIC_SUM, " +
+ "METRIC_MAX, " +
+ "METRIC_MIN, " +
+ "METRIC_COUNT, " +
+ "METRICS " +
+ "FROM %s";
+
+ public static final String GET_METRIC_AGGREGATE_ONLY_SQL = "SELECT %s " +
+ "METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, " +
+ "UNITS, " +
+ "METRIC_SUM, " +
+ "METRIC_MAX, " +
+ "METRIC_MIN, " +
+ "METRIC_COUNT " +
+ "FROM %s";
+
+ public static final String GET_CLUSTER_AGGREGATE_SQL = "SELECT %s " +
+ "METRIC_NAME, APP_ID, " +
+ "INSTANCE_ID, SERVER_TIME, " +
+ "UNITS, " +
+ "METRIC_SUM, " +
+ "HOSTS_COUNT, " +
+ "METRIC_MAX, " +
+ "METRIC_MIN " +
+ "FROM %s";
+
+ public static final String GET_CLUSTER_AGGREGATE_HOURLY_SQL = "SELECT %s " +
+ "METRIC_NAME, APP_ID, " +
+ "INSTANCE_ID, SERVER_TIME, " +
+ "UNITS, " +
+ "METRIC_SUM, " +
+ "METRIC_COUNT, " +
+ "METRIC_MAX, " +
+ "METRIC_MIN " +
+ "FROM %s";
+
+ public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD";
+ public static final String METRICS_AGGREGATE_MINUTE_TABLE_NAME =
+ "METRIC_RECORD_MINUTE";
+ public static final String METRICS_AGGREGATE_HOURLY_TABLE_NAME =
+ "METRIC_RECORD_HOURLY";
+ public static final String METRICS_CLUSTER_AGGREGATE_TABLE_NAME =
+ "METRIC_AGGREGATE";
+ public static final String METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME =
+ "METRIC_AGGREGATE_HOURLY";
+ public static final String DEFAULT_TABLE_COMPRESSION = "SNAPPY";
+ public static final String DEFAULT_ENCODING = "FAST_DIFF";
+ public static final long NATIVE_TIME_RANGE_DELTA = 120000; // 2 minutes
+ public static final long HOUR = 3600000; // 1 hour
+ public static final long DAY = 86400000; // 1 day
+
+ /**
+ * Filter to optimize HBase scan by using file timestamps. This prevents
+ * a full table scan of metric records.
+ *
+ * @return Phoenix Hint String
+ */
+ public static String getNaiveTimeRangeHint(Long startTime, Long delta) {
+ return String.format("/*+ NATIVE_TIME_RANGE(%s) */", (startTime - delta));
+ }
+
+ public static PreparedStatement prepareGetMetricsSqlStmt(
+ Connection connection, Condition condition) throws SQLException {
+
+ validateConditionIsNotEmpty(condition);
+ validateRowCountLimit(condition);
+
+ String stmtStr;
+ if (condition.getStatement() != null) {
+ stmtStr = condition.getStatement();
+ } else {
+
+ String metricsTable;
+ String query;
+ if (condition.getPrecision() == null) {
+ long endTime = condition.getEndTime() == null ? System.currentTimeMillis() : condition.getEndTime();
+ long startTime = condition.getStartTime() == null ? 0 : condition.getStartTime();
+ Long timeRange = endTime - startTime;
+ if (timeRange > 5 * DAY) {
+ metricsTable = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+ query = GET_METRIC_AGGREGATE_ONLY_SQL;
+ condition.setPrecision(Precision.HOURS);
+ } else if (timeRange > 10 * HOUR) {
+ metricsTable = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+ query = GET_METRIC_AGGREGATE_ONLY_SQL;
+ condition.setPrecision(Precision.MINUTES);
+ } else {
+ metricsTable = METRICS_RECORD_TABLE_NAME;
+ query = GET_METRIC_SQL;
+ condition.setPrecision(Precision.SECONDS);
+ }
+ } else {
+ switch (condition.getPrecision()) {
+ case HOURS:
+ metricsTable = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+ query = GET_METRIC_AGGREGATE_ONLY_SQL;
+ break;
+ case MINUTES:
+ metricsTable = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+ query = GET_METRIC_AGGREGATE_ONLY_SQL;
+ break;
+ default:
+ metricsTable = METRICS_RECORD_TABLE_NAME;
+ query = GET_METRIC_SQL;
+ }
+ }
+
+ stmtStr = String.format(query,
+ getNaiveTimeRangeHint(condition.getStartTime(), NATIVE_TIME_RANGE_DELTA),
+ metricsTable);
+ }
+
+ StringBuilder sb = new StringBuilder(stmtStr);
+ sb.append(" WHERE ");
+ sb.append(condition.getConditionClause());
+ String orderByClause = condition.getOrderByClause(true);
+
+ if (orderByClause != null) {
+ sb.append(orderByClause);
+ } else {
+ sb.append(" ORDER BY METRIC_NAME, SERVER_TIME ");
+ }
+ if (condition.getLimit() != null) {
+ sb.append(" LIMIT ").append(condition.getLimit());
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SQL: " + sb.toString() + ", condition: " + condition);
+ }
+ PreparedStatement stmt = connection.prepareStatement(sb.toString());
+ int pos = 1;
+ if (condition.getMetricNames() != null) {
+ for (; pos <= condition.getMetricNames().size(); pos++) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Setting pos: " + pos + ", value = " + condition.getMetricNames().get(pos - 1));
+ }
+ stmt.setString(pos, condition.getMetricNames().get(pos - 1));
+ }
+ }
+ if (condition.getHostname() != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Setting pos: " + pos + ", value: " + condition.getHostname());
+ }
+ stmt.setString(pos++, condition.getHostname());
+ }
+ if (condition.getAppId() != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId());
+ }
+ stmt.setString(pos++, condition.getAppId());
+ }
+ if (condition.getInstanceId() != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Setting pos: " + pos + ", value: " + condition.getInstanceId());
+ }
+ stmt.setString(pos++, condition.getInstanceId());
+ }
+ if (condition.getStartTime() != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Setting pos: " + pos + ", value: " + condition.getStartTime());
+ }
+ stmt.setLong(pos++, condition.getStartTime());
+ }
+ if (condition.getEndTime() != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Setting pos: " + pos + ", value: " + condition.getEndTime());
+ }
+ stmt.setLong(pos, condition.getEndTime());
+ }
+ if (condition.getFetchSize() != null) {
+ stmt.setFetchSize(condition.getFetchSize());
+ }
+
+ return stmt;
+ }
+
+ private static void validateConditionIsNotEmpty(Condition condition) {
+ if (condition.isEmpty()) {
+ throw new IllegalArgumentException("Condition is empty.");
+ }
+ }
+
+ private static void validateRowCountLimit(Condition condition) {
+ if (condition.getMetricNames() == null
+ || condition.getMetricNames().size() ==0 ) {
+ //aggregator can use empty metrics query
+ return;
+ }
+
+ long range = condition.getEndTime() - condition.getStartTime();
+ long rowsPerMetric = TimeUnit.MILLISECONDS.toHours(range) + 1;
+
+ Precision precision = condition.getPrecision();
+ // for minutes and seconds we can use the rowsPerMetric computed based on
+ // minutes
+ if (precision != null && precision == Precision.HOURS) {
+ rowsPerMetric = TimeUnit.MILLISECONDS.toHours(range) + 1;
+ }
+
+ long totalRowsRequested = rowsPerMetric * condition.getMetricNames().size();
+ if (totalRowsRequested > PhoenixHBaseAccessor.RESULTSET_LIMIT) {
+ throw new IllegalArgumentException("The time range query for " +
+ "precision table exceeds row count limit, please query aggregate " +
+ "table instead.");
+ }
+ }
+
+ public static PreparedStatement prepareGetLatestMetricSqlStmt(
+ Connection connection, Condition condition) throws SQLException {
+
+ validateConditionIsNotEmpty(condition);
+
+ if (condition.getMetricNames() == null
+ || condition.getMetricNames().size() == 0) {
+ throw new IllegalArgumentException("Point in time query without " +
+ "metric names not supported ");
+ }
+
+ String stmtStr;
+ if (condition.getStatement() != null) {
+ stmtStr = condition.getStatement();
+ } else {
+ stmtStr = String.format(GET_METRIC_SQL,
+ "",
+ METRICS_RECORD_TABLE_NAME);
+ }
+
+ StringBuilder sb = new StringBuilder(stmtStr);
+ sb.append(" WHERE ");
+ sb.append(condition.getConditionClause());
+ String orderByClause = condition.getOrderByClause(false);
+ if (orderByClause != null) {
+ sb.append(orderByClause);
+ } else {
+ sb.append(" ORDER BY METRIC_NAME DESC, HOSTNAME DESC, SERVER_TIME DESC ");
+ }
+
+ sb.append(" LIMIT ").append(condition.getMetricNames().size());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SQL: " + sb.toString() + ", condition: " + condition);
+ }
+ PreparedStatement stmt = connection.prepareStatement(sb.toString());
+ int pos = 1;
+ if (condition.getMetricNames() != null) {
+ //IGNORE condition limit, set one based on number of metric names
+ for (; pos <= condition.getMetricNames().size(); pos++) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Setting pos: " + pos + ", value = " + condition.getMetricNames().get(pos - 1));
+ }
+ stmt.setString(pos, condition.getMetricNames().get(pos - 1));
+ }
+ }
+ if (condition.getHostname() != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Setting pos: " + pos + ", value: " + condition.getHostname());
+ }
+ stmt.setString(pos++, condition.getHostname());
+ }
+ if (condition.getAppId() != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId());
+ }
+ stmt.setString(pos++, condition.getAppId());
+ }
+ if (condition.getInstanceId() != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Setting pos: " + pos + ", value: " + condition.getInstanceId());
+ }
+ stmt.setString(pos++, condition.getInstanceId());
+ }
+
+ if (condition.getFetchSize() != null) {
+ stmt.setFetchSize(condition.getFetchSize());
+ }
+
+ return stmt;
+ }
+
+ public static PreparedStatement prepareGetAggregateSqlStmt(
+ Connection connection, Condition condition) throws SQLException {
+
+ validateConditionIsNotEmpty(condition);
+
+ String metricsAggregateTable;
+ String queryStmt;
+ if (condition.getPrecision() == null) {
+ long endTime = condition.getEndTime() == null ? System.currentTimeMillis() : condition.getEndTime();
+ long startTime = condition.getStartTime() == null ? 0 : condition.getStartTime();
+ Long timeRange = endTime - startTime;
+ if (timeRange > 5 * DAY) {
+ metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+ queryStmt = GET_CLUSTER_AGGREGATE_HOURLY_SQL;
+ condition.setPrecision(Precision.HOURS);
+ } else {
+ metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+ queryStmt = GET_CLUSTER_AGGREGATE_SQL;
+ condition.setPrecision(Precision.SECONDS);
+ }
+ } else {
+ switch (condition.getPrecision()) {
+ case HOURS:
+ metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+ queryStmt = GET_CLUSTER_AGGREGATE_HOURLY_SQL;
+ break;
+ default:
+ metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+ queryStmt = GET_CLUSTER_AGGREGATE_SQL;
+ }
+ }
+
+ StringBuilder sb = new StringBuilder(queryStmt);
+ sb.append(" WHERE ");
+ sb.append(condition.getConditionClause());
+ sb.append(" ORDER BY METRIC_NAME, SERVER_TIME");
+ if (condition.getLimit() != null) {
+ sb.append(" LIMIT ").append(condition.getLimit());
+ }
+
+ String query = String.format(sb.toString(),
+ PhoenixTransactSQL.getNaiveTimeRangeHint(condition.getStartTime(),
+ NATIVE_TIME_RANGE_DELTA), metricsAggregateTable);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SQL => " + query + ", condition => " + condition);
+ }
+ PreparedStatement stmt = connection.prepareStatement(query);
+ int pos = 1;
+ if (condition.getMetricNames() != null) {
+ for (; pos <= condition.getMetricNames().size(); pos++) {
+ stmt.setString(pos, condition.getMetricNames().get(pos - 1));
+ }
+ }
+ // TODO: Upper case all strings on POST
+ if (condition.getAppId() != null) {
+ stmt.setString(pos++, condition.getAppId());
+ }
+ if (condition.getInstanceId() != null) {
+ stmt.setString(pos++, condition.getInstanceId());
+ }
+ if (condition.getStartTime() != null) {
+ stmt.setLong(pos++, condition.getStartTime());
+ }
+ if (condition.getEndTime() != null) {
+ stmt.setLong(pos, condition.getEndTime());
+ }
+
+ return stmt;
+ }
+
+ public static PreparedStatement prepareGetLatestAggregateMetricSqlStmt(
+ Connection connection, Condition condition) throws SQLException {
+
+ validateConditionIsNotEmpty(condition);
+
+ String stmtStr;
+ if (condition.getStatement() != null) {
+ stmtStr = condition.getStatement();
+ } else {
+ stmtStr = String.format(GET_CLUSTER_AGGREGATE_SQL, "",
+ METRICS_CLUSTER_AGGREGATE_TABLE_NAME);
+ }
+
+ StringBuilder sb = new StringBuilder(stmtStr);
+ sb.append(" WHERE ");
+ sb.append(condition.getConditionClause());
+ String orderByClause = condition.getOrderByClause(false);
+ if (orderByClause != null) {
+ sb.append(orderByClause);
+ } else {
+ sb.append(" ORDER BY METRIC_NAME DESC, SERVER_TIME DESC ");
+ }
+
+ sb.append(" LIMIT ").append(condition.getMetricNames().size());
+
+ String query = sb.toString();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SQL: " + query + ", condition: " + condition);
+ }
+
+ PreparedStatement stmt = connection.prepareStatement(query);
+ int pos = 1;
+ if (condition.getMetricNames() != null) {
+ for (; pos <= condition.getMetricNames().size(); pos++) {
+ stmt.setString(pos, condition.getMetricNames().get(pos - 1));
+ }
+ }
+ if (condition.getAppId() != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId());
+ }
+ stmt.setString(pos++, condition.getAppId());
+ }
+ if (condition.getInstanceId() != null) {
+ stmt.setString(pos++, condition.getInstanceId());
+ }
+
+ return stmt;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/SplitByMetricNamesCondition.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/SplitByMetricNamesCondition.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/SplitByMetricNamesCondition.java
new file mode 100644
index 0000000..00d6a82
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/SplitByMetricNamesCondition.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
+
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Precision;
+
+import java.util.Collections;
+import java.util.List;
+
+public class SplitByMetricNamesCondition implements Condition {
+ private final Condition adaptee;
+ private String currentMetric;
+
+ public SplitByMetricNamesCondition(Condition condition){
+ this.adaptee = condition;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return adaptee.isEmpty();
+ }
+
+ @Override
+ public List<String> getMetricNames() {
+ return Collections.singletonList(currentMetric);
+ }
+
+ @Override
+ public boolean isPointInTime() {
+ return adaptee.isPointInTime();
+ }
+
+ @Override
+ public boolean isGrouped() {
+ return adaptee.isGrouped();
+ }
+
+ @Override
+ public void setStatement(String statement) {
+ adaptee.setStatement(statement);
+ }
+
+ @Override
+ public String getHostname() {
+ return adaptee.getHostname();
+ }
+
+ @Override
+ public Precision getPrecision() {
+ return adaptee.getPrecision();
+ }
+
+ @Override
+ public void setPrecision(Precision precision) {
+ adaptee.setPrecision(precision);
+ }
+
+ @Override
+ public String getAppId() {
+ return adaptee.getAppId();
+ }
+
+ @Override
+ public String getInstanceId() {
+ return adaptee.getInstanceId();
+ }
+
+ @Override
+ public StringBuilder getConditionClause() {
+ StringBuilder sb = new StringBuilder();
+ boolean appendConjunction = false;
+
+ if (getMetricNames() != null) {
+ for (String name : getMetricNames()) {
+ if (sb.length() > 1) {
+ sb.append(" OR ");
+ }
+ sb.append("METRIC_NAME = ?");
+ }
+
+ appendConjunction = true;
+ }
+
+ appendConjunction = DefaultCondition.append(sb, appendConjunction,
+ getHostname(), " HOSTNAME = ?");
+ appendConjunction = DefaultCondition.append(sb, appendConjunction,
+ getAppId(), " APP_ID = ?");
+ appendConjunction = DefaultCondition.append(sb, appendConjunction,
+ getInstanceId(), " INSTANCE_ID = ?");
+ appendConjunction = DefaultCondition.append(sb, appendConjunction,
+ getStartTime(), " SERVER_TIME >= ?");
+ DefaultCondition.append(sb, appendConjunction, getEndTime(),
+ " SERVER_TIME < ?");
+
+ return sb;
+ }
+
+ @Override
+ public String getOrderByClause(boolean asc) {
+ return adaptee.getOrderByClause(asc);
+ }
+
+ @Override
+ public String getStatement() {
+ return adaptee.getStatement();
+ }
+
+ @Override
+ public Long getStartTime() {
+ return adaptee.getStartTime();
+ }
+
+ @Override
+ public Long getEndTime() {
+ return adaptee.getEndTime();
+ }
+
+ @Override
+ public Integer getLimit() {
+ return adaptee.getLimit();
+ }
+
+ @Override
+ public Integer getFetchSize() {
+ return adaptee.getFetchSize();
+ }
+
+ @Override
+ public void setFetchSize(Integer fetchSize) {
+ adaptee.setFetchSize(fetchSize);
+ }
+
+ @Override
+ public void addOrderByColumn(String column) {
+ adaptee.addOrderByColumn(column);
+ }
+
+ @Override
+ public void setNoLimit() {
+ adaptee.setNoLimit();
+ }
+
+ public List<String> getOriginalMetricNames() {
+ return adaptee.getMetricNames();
+ }
+
+ public void setCurrentMetric(String currentMetric) {
+ this.currentMetric = currentMetric;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java
index 3720852..e1d256d 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java
@@ -24,8 +24,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
- .timeline.DefaultPhoenixDataSource;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultPhoenixDataSource;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
.timeline.PhoenixHBaseAccessor;
import org.apache.zookeeper.ClientCnxn;
http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
index b11a977..90c03e4 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConnectionProvider;
import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.QueryServices;
@@ -37,7 +38,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.LOG;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.LOG;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.assertj.core.api.Assertions.assertThat;
http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java
index 969192d..c22e734 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java
@@ -18,6 +18,9 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
import org.apache.hadoop.yarn.util.Clock;
import org.junit.Before;
import org.junit.Test;
@@ -59,7 +62,7 @@ public class AbstractTimelineAggregatorTest {
agg = new AbstractTimelineAggregator(
null, metricsConf, clock) {
@Override
- protected boolean doWork(long startTime, long endTime) {
+ public boolean doWork(long startTime, long endTime) {
startTimeInDoWork.set(startTime);
endTimeInDoWork.set(endTime);
actualRuns++;
@@ -68,7 +71,7 @@ public class AbstractTimelineAggregatorTest {
}
@Override
- protected PhoenixTransactSQL.Condition
+ protected Condition
prepareMetricQueryCondition(long startTime, long endTime) {
return null;
}
@@ -89,7 +92,7 @@ public class AbstractTimelineAggregatorTest {
}
@Override
- protected boolean isDisabled() {
+ public boolean isDisabled() {
return false;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/FunctionTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/FunctionTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/FunctionTest.java
index 2a389ac..af9c6bb 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/FunctionTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/FunctionTest.java
@@ -17,11 +17,12 @@
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
import org.junit.Test;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Function.fromMetricName;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Function.ReadFunction.AVG;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Function.PostProcessingFunction.RATE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.fromMetricName;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.ReadFunction.AVG;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.PostProcessingFunction.RATE;
import static org.assertj.core.api.Assertions.assertThat;
public class FunctionTest {