You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2014/12/02 18:28:32 UTC

[16/30] ambari git commit: AMBARI-5707. Replace Ganglia with high performant and pluggable Metrics System. (swagle)

http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java
new file mode 100644
index 0000000..652c492
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+public class DefaultPhoenixDataSource implements ConnectionProvider {
+
+  static final Log LOG = LogFactory.getLog(DefaultPhoenixDataSource.class);
+  private static final String ZOOKEEPER_CLIENT_PORT =
+    "hbase.zookeeper.property.clientPort";
+  private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
+  private static final String ZNODE_PARENT = "zookeeper.znode.parent";
+
+  private static final String connectionUrl = "jdbc:phoenix:%s:%s:%s";
+  private final String url;
+
+  public DefaultPhoenixDataSource(Configuration hbaseConf) {
+    String zookeeperClientPort = hbaseConf.getTrimmed(ZOOKEEPER_CLIENT_PORT,
+      "2181");
+    String zookeeperQuorum = hbaseConf.getTrimmed(ZOOKEEPER_QUORUM);
+    String znodeParent = hbaseConf.getTrimmed(ZNODE_PARENT, "/hbase");
+    if (zookeeperQuorum == null || zookeeperQuorum.isEmpty()) {
+      throw new IllegalStateException("Unable to find Zookeeper quorum to " +
+        "access HBase store using Phoenix.");
+    }
+
+    url = String.format(connectionUrl,
+      zookeeperQuorum,
+      zookeeperClientPort,
+      znodeParent);
+  }
+
+  /**
+   * Get JDBC connection to HBase store. Assumption is that the hbase
+   * configuration is present on the classpath and loaded by the caller into
+   * the Configuration object.
+   * Phoenix already caches the HConnection between the client and HBase
+   * cluster.
+   *
+   * @return @java.sql.Connection
+   */
+  public Connection getConnection() throws SQLException {
+
+    LOG.debug("Metric store connection url: " + url);
+    try {
+      return DriverManager.getConnection(url);
+    } catch (SQLException e) {
+      LOG.warn("Unable to connect to HBase store using Phoenix.", e);
+
+      throw e;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
new file mode 100644
index 0000000..9364187
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+
+import java.io.IOException;
+import java.net.URL;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline.TimelineMetricConfiguration.HBASE_SITE_CONFIGURATION_FILE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline.TimelineMetricConfiguration.METRICS_SITE_CONFIGURATION_FILE;
+
+public class HBaseTimelineMetricStore extends AbstractService
+    implements TimelineMetricStore {
+
+  static final Log LOG = LogFactory.getLog(HBaseTimelineMetricStore.class);
+  private PhoenixHBaseAccessor hBaseAccessor;
+
+  /**
+   * Construct the service.
+   *
+   */
+  public HBaseTimelineMetricStore() {
+    super(HBaseTimelineMetricStore.class.getName());
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+    if (classLoader == null) {
+      classLoader = getClass().getClassLoader();
+    }
+    URL hbaseResUrl = classLoader.getResource(HBASE_SITE_CONFIGURATION_FILE);
+    URL amsResUrl = classLoader.getResource(METRICS_SITE_CONFIGURATION_FILE);
+    LOG.info("Found hbase site configuration: " + hbaseResUrl);
+    LOG.info("Found metric service configuration: " + amsResUrl);
+
+    if (hbaseResUrl == null) {
+      throw new IllegalStateException("Unable to initialize the metrics " +
+        "subsystem. No hbase-site present in the classpath.");
+    }
+
+    if (amsResUrl == null) {
+      throw new IllegalStateException("Unable to initialize the metrics " +
+        "subsystem. No ams-site present in the classpath.");
+    }
+
+    Configuration hbaseConf = new Configuration(true);
+    hbaseConf.addResource(hbaseResUrl.toURI().toURL());
+    Configuration metricsConf = new Configuration(true);
+    metricsConf.addResource(amsResUrl.toURI().toURL());
+
+    initializeSubsystem(hbaseConf, metricsConf);
+  }
+
+  private void initializeSubsystem(Configuration hbaseConf,
+                                   Configuration metricsConf) {
+    hBaseAccessor = new PhoenixHBaseAccessor(hbaseConf, metricsConf);
+    hBaseAccessor.initMetricSchema();
+
+    // Start the cluster aggregator
+    TimelineMetricClusterAggregator minuteClusterAggregator =
+      new TimelineMetricClusterAggregator(hBaseAccessor, metricsConf);
+    if (!minuteClusterAggregator.isDisabled()) {
+      Thread aggregatorThread = new Thread(minuteClusterAggregator);
+      aggregatorThread.start();
+    }
+
+    // Start the cluster aggregator hourly
+    TimelineMetricClusterAggregatorHourly hourlyClusterAggregator =
+      new TimelineMetricClusterAggregatorHourly(hBaseAccessor, metricsConf);
+    if (!hourlyClusterAggregator.isDisabled()) {
+      Thread aggregatorThread = new Thread(hourlyClusterAggregator);
+      aggregatorThread.start();
+    }
+
+    // Start the 5 minute aggregator
+    TimelineMetricAggregator minuteHostAggregator =
+      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute
+        (hBaseAccessor, metricsConf);
+    if (!minuteHostAggregator.isDisabled()) {
+      Thread minuteAggregatorThread = new Thread(minuteHostAggregator);
+      minuteAggregatorThread.start();
+    }
+
+    // Start hourly host aggregator
+    TimelineMetricAggregator hourlyHostAggregator =
+      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly
+        (hBaseAccessor, metricsConf);
+    if (!hourlyHostAggregator.isDisabled()) {
+      Thread aggregatorHourlyThread = new Thread(hourlyHostAggregator);
+      aggregatorHourlyThread.start();
+    }
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    super.serviceStop();
+  }
+
+  //TODO: update to work with HOSTS_COUNT and METRIC_COUNT
+  @Override
+  public TimelineMetrics getTimelineMetrics(List<String> metricNames,
+      String hostname, String applicationId, String instanceId,
+      Long startTime, Long endTime, Integer limit,
+      boolean groupedByHosts) throws SQLException, IOException {
+
+    Condition condition = new Condition(metricNames, hostname, applicationId,
+      instanceId, startTime, endTime, limit, groupedByHosts);
+
+    if (hostname == null) {
+      return hBaseAccessor.getAggregateMetricRecords(condition);
+    }
+
+    return hBaseAccessor.getMetricRecords(condition);
+  }
+
+  @Override
+  public TimelineMetric getTimelineMetric(String metricName, String hostname,
+      String applicationId, String instanceId, Long startTime,
+      Long endTime, Integer limit)
+      throws SQLException, IOException {
+
+    TimelineMetrics metrics = hBaseAccessor.getMetricRecords(
+      new Condition(Collections.singletonList(metricName), hostname,
+        applicationId, instanceId, startTime, endTime, limit, true)
+    );
+
+    TimelineMetric metric = new TimelineMetric();
+    List<TimelineMetric> metricList = metrics.getMetrics();
+
+    if (metricList != null && !metricList.isEmpty()) {
+      metric.setMetricName(metricList.get(0).getMetricName());
+      metric.setAppId(metricList.get(0).getAppId());
+      metric.setInstanceId(metricList.get(0).getInstanceId());
+      metric.setHostName(metricList.get(0).getHostName());
+      // Assumption that metrics are ordered by start time
+      metric.setStartTime(metricList.get(0).getStartTime());
+      Map<Long, Double> metricRecords = new HashMap<Long, Double>();
+      for (TimelineMetric timelineMetric : metricList) {
+        metricRecords.putAll(timelineMetric.getMetricValues());
+      }
+      metric.setMetricValues(metricRecords);
+    }
+
+    return metric;
+  }
+
+
+  @Override
+  public TimelinePutResponse putMetrics(TimelineMetrics metrics)
+    throws SQLException, IOException {
+
+    // Error indicated by the Sql exception
+    TimelinePutResponse response = new TimelinePutResponse();
+
+    hBaseAccessor.insertMetricRecords(metrics);
+
+    return response;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java
new file mode 100644
index 0000000..61e15d7
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.annotate.JsonSubTypes;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.IOException;
+
+/**
+*
+*/
+@JsonSubTypes({@JsonSubTypes.Type(value = MetricClusterAggregate.class),
+  @JsonSubTypes.Type(value = MetricHostAggregate.class)})
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class MetricAggregate {
+  private static final ObjectMapper mapper = new ObjectMapper();
+
+  protected Double sum = 0.0;
+  protected Double deviation;
+  protected Double max = Double.MIN_VALUE;
+  protected Double min = Double.MAX_VALUE;
+
+  public MetricAggregate() {
+  }
+
+  MetricAggregate(Double sum, Double deviation, Double max,
+                  Double min) {
+    this.sum = sum;
+    this.deviation = deviation;
+    this.max = max;
+    this.min = min;
+  }
+
+  void updateSum(Double sum) {
+    this.sum += sum;
+  }
+
+  void updateMax(Double max) {
+    if (max > this.max) {
+      this.max = max;
+    }
+  }
+
+  void updateMin(Double min) {
+    if (min < this.min) {
+      this.min = min;
+    }
+  }
+
+  @JsonProperty("sum")
+  Double getSum() {
+    return sum;
+  }
+
+  @JsonProperty("deviation")
+  Double getDeviation() {
+    return deviation;
+  }
+
+  @JsonProperty("max")
+  Double getMax() {
+    return max;
+  }
+
+  @JsonProperty("min")
+  Double getMin() {
+    return min;
+  }
+
+  public void setSum(Double sum) {
+    this.sum = sum;
+  }
+
+  public void setDeviation(Double deviation) {
+    this.deviation = deviation;
+  }
+
+  public void setMax(Double max) {
+    this.max = max;
+  }
+
+  public void setMin(Double min) {
+    this.min = min;
+  }
+
+  public String toJSON() throws IOException {
+    return mapper.writeValueAsString(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java
new file mode 100644
index 0000000..c13c85f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+*
+*/
+public class MetricClusterAggregate extends MetricAggregate {
+  private int numberOfHosts;
+
+  @JsonCreator
+  public MetricClusterAggregate() {
+  }
+
+  MetricClusterAggregate(Double sum, int numberOfHosts, Double deviation,
+                         Double max, Double min) {
+    super(sum, deviation, max, min);
+    this.numberOfHosts = numberOfHosts;
+  }
+
+  @JsonProperty("numberOfHosts")
+  int getNumberOfHosts() {
+    return numberOfHosts;
+  }
+
+  void updateNumberOfHosts(int count) {
+    this.numberOfHosts += count;
+  }
+
+  public void setNumberOfHosts(int numberOfHosts) {
+    this.numberOfHosts = numberOfHosts;
+  }
+
+  /**
+   * Find and update min, max and avg for a minute
+   */
+  void updateAggregates(MetricClusterAggregate hostAggregate) {
+    updateMax(hostAggregate.getMax());
+    updateMin(hostAggregate.getMin());
+    updateSum(hostAggregate.getSum());
+    updateNumberOfHosts(hostAggregate.getNumberOfHosts());
+  }
+
+  @Override
+  public String toString() {
+//    MetricClusterAggregate
+    return "MetricAggregate{" +
+      "sum=" + sum +
+      ", numberOfHosts=" + numberOfHosts +
+      ", deviation=" + deviation +
+      ", max=" + max +
+      ", min=" + min +
+      '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java
new file mode 100644
index 0000000..02cc207
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * Represents a collection of minute based aggregation of values for
+ * resolution greater than a minute.
+ */
+public class MetricHostAggregate extends MetricAggregate {
+
+  private long numberOfSamples = 0;
+
+  @JsonCreator
+  public MetricHostAggregate() {
+    super(0.0, 0.0, Double.MIN_VALUE, Double.MAX_VALUE);
+  }
+
+  public MetricHostAggregate(Double sum, int numberOfSamples,
+                             Double deviation,
+                             Double max, Double min) {
+    super(sum, deviation, max, min);
+    this.numberOfSamples = numberOfSamples;
+  }
+
+  @JsonProperty("numberOfSamples")
+  long getNumberOfSamples() {
+    return numberOfSamples == 0 ? 1 : numberOfSamples;
+  }
+
+  void updateNumberOfSamples(long count) {
+    this.numberOfSamples += count;
+  }
+
+  public void setNumberOfSamples(long numberOfSamples) {
+    this.numberOfSamples = numberOfSamples;
+  }
+
+  public double getAvg() {
+    return sum / numberOfSamples;
+  }
+
+  /**
+   * Find and update min, max and avg for a minute
+   */
+  void updateAggregates(MetricHostAggregate hostAggregate) {
+    updateMax(hostAggregate.getMax());
+    updateMin(hostAggregate.getMin());
+    updateSum(hostAggregate.getSum());
+    updateNumberOfSamples(hostAggregate.getNumberOfSamples());
+  }
+
+  @Override
+  public String toString() {
+    return "MetricHostAggregate{" +
+      "sum=" + sum +
+      ", numberOfSamples=" + numberOfSamples +
+      ", deviation=" + deviation +
+      ", max=" + max +
+      ", min=" + min +
+      '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricsInitializationException.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricsInitializationException.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricsInitializationException.java
new file mode 100644
index 0000000..88a427a
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricsInitializationException.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline;
+
+/**
+ * RuntimeException for initialization of metrics schema. It is RuntimeException
+ * since this is a not recoverable situation, and should be handled by main or
+ * service method followed by shutdown.
+ */
+public class MetricsInitializationException extends RuntimeException {
+  public MetricsInitializationException() {
+  }
+
+  public MetricsInitializationException(String msg) {
+    super(msg);
+  }
+
+  public MetricsInitializationException(Throwable t) {
+    super(t);
+  }
+
+  public MetricsInitializationException(String msg, Throwable t) {
+    super(msg, t);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
new file mode 100644
index 0000000..4f248b7
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -0,0 +1,678 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.hadoop.hbase.util.RetryCounterFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_ENCODING;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_TIME_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_METRICS_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_MINUTE_TABLE_TTL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_MAX_RETRIES;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RESULT_LIMIT;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RETRY_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_ENCODING_SCHEME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_HOUR_TABLE_TTL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_MINUTE_TABLE_TTL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL;
+
+/**
+ * Provides a facade over the Phoenix API to access HBase schema
+ */
+public class PhoenixHBaseAccessor {
+
+  private static final Log LOG = LogFactory.getLog(PhoenixHBaseAccessor.class);
+  private final Configuration hbaseConf;
+  private final Configuration metricsConf;
+  private final RetryCounterFactory retryCounterFactory;
+
+  static final int PHOENIX_MAX_MUTATION_STATE_SIZE = 50000;
+  /**
+   * 4 metrics/min * 60 * 24: Retrieve data for 1 day.
+   */
+  private static final int METRICS_PER_MINUTE = 4;
+  public static int RESULTSET_LIMIT = (int)TimeUnit.DAYS.toMinutes(1) *
+    METRICS_PER_MINUTE;
+  private static ObjectMapper mapper = new ObjectMapper();
+
+  private static TypeReference<Map<Long, Double>> metricValuesTypeRef =
+    new TypeReference<Map<Long, Double>>() {};
+  private final ConnectionProvider dataSource;
+
+  public PhoenixHBaseAccessor(Configuration hbaseConf,
+                              Configuration metricsConf){
+    this(hbaseConf, metricsConf, new DefaultPhoenixDataSource(hbaseConf));
+  }
+
+  public PhoenixHBaseAccessor(Configuration hbaseConf,
+                              Configuration metricsConf,
+                              ConnectionProvider dataSource) {
+    this.hbaseConf = hbaseConf;
+    this.metricsConf = metricsConf;
+    RESULTSET_LIMIT = metricsConf.getInt(GLOBAL_RESULT_LIMIT, 5760);
+    try {
+      Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
+    } catch (ClassNotFoundException e) {
+      LOG.error("Phoenix client jar not found in the classpath.", e);
+      throw new IllegalStateException(e);
+    }
+    this.dataSource = dataSource;
+    this.retryCounterFactory = new RetryCounterFactory(
+      metricsConf.getInt(GLOBAL_MAX_RETRIES, 10),
+      (int) SECONDS.toMillis(metricsConf.getInt(GLOBAL_RETRY_INTERVAL, 5)));
+  }
+
+
+  private Connection getConnectionRetryingOnException()
+    throws SQLException, InterruptedException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    while (true) {
+      try{
+        return getConnection();
+      } catch (SQLException e) {
+        if(!retryCounter.shouldRetry()){
+          LOG.error("HBaseAccessor getConnection failed after "
+            + retryCounter.getMaxAttempts() + " attempts");
+          throw e;
+        }
+      }
+      retryCounter.sleepUntilNextRetry();
+    }
+  }
+
+
+  /**
+   * Get JDBC connection to HBase store. Assumption is that the hbase
+   * configuration is present on the classpath and loaded by the caller into
+   * the Configuration object.
+   * Phoenix already caches the HConnection between the client and HBase
+   * cluster.
+   *
+   * @return @java.sql.Connection
+   */
+  public Connection getConnection() throws SQLException {
+    return dataSource.getConnection();
+  }
+
+  public static Map readMetricFromJSON(String json) throws IOException {
+    return mapper.readValue(json, metricValuesTypeRef);
+  }
+
+  @SuppressWarnings("unchecked")
+  static TimelineMetric getTimelineMetricFromResultSet(ResultSet rs)
+    throws SQLException, IOException {
+    TimelineMetric metric = new TimelineMetric();
+    metric.setMetricName(rs.getString("METRIC_NAME"));
+    metric.setAppId(rs.getString("APP_ID"));
+    metric.setInstanceId(rs.getString("INSTANCE_ID"));
+    metric.setHostName(rs.getString("HOSTNAME"));
+    metric.setTimestamp(rs.getLong("SERVER_TIME"));
+    metric.setStartTime(rs.getLong("START_TIME"));
+    metric.setType(rs.getString("UNITS"));
+    metric.setMetricValues(
+      (Map<Long, Double>) readMetricFromJSON(rs.getString("METRICS")));
+    return metric;
+  }
+
+  static TimelineMetric getTimelineMetricKeyFromResultSet(ResultSet rs)
+    throws SQLException, IOException {
+    TimelineMetric metric = new TimelineMetric();
+    metric.setMetricName(rs.getString("METRIC_NAME"));
+    metric.setAppId(rs.getString("APP_ID"));
+    metric.setInstanceId(rs.getString("INSTANCE_ID"));
+    metric.setHostName(rs.getString("HOSTNAME"));
+    metric.setTimestamp(rs.getLong("SERVER_TIME"));
+    metric.setType(rs.getString("UNITS"));
+    return metric;
+  }
+
+  static MetricHostAggregate getMetricHostAggregateFromResultSet(ResultSet rs)
+    throws SQLException {
+    MetricHostAggregate metricHostAggregate = new MetricHostAggregate();
+    metricHostAggregate.setSum(rs.getDouble("METRIC_SUM"));
+    metricHostAggregate.setMax(rs.getDouble("METRIC_MAX"));
+    metricHostAggregate.setMin(rs.getDouble("METRIC_MIN"));
+    metricHostAggregate.setNumberOfSamples(rs.getLong("METRIC_COUNT"));
+
+    metricHostAggregate.setDeviation(0.0);
+    return metricHostAggregate;
+  }
+
+  static TimelineClusterMetric
+  getTimelineMetricClusterKeyFromResultSet(ResultSet rs)
+    throws SQLException, IOException {
+    TimelineClusterMetric metric = new TimelineClusterMetric(
+      rs.getString("METRIC_NAME"),
+      rs.getString("APP_ID"),
+      rs.getString("INSTANCE_ID"),
+      rs.getLong("SERVER_TIME"),
+      rs.getString("UNITS"));
+
+    return metric;
+  }
+
+  static MetricClusterAggregate
+  getMetricClusterAggregateFromResultSet(ResultSet rs)
+    throws SQLException {
+    MetricClusterAggregate agg = new MetricClusterAggregate();
+    agg.setSum(rs.getDouble("METRIC_SUM"));
+    agg.setMax(rs.getDouble("METRIC_MAX"));
+    agg.setMin(rs.getDouble("METRIC_MIN"));
+    agg.setNumberOfHosts(rs.getInt("HOSTS_COUNT"));
+
+    agg.setDeviation(0.0);
+
+    return agg;
+  }
+
+  protected void initMetricSchema() {
+    Connection conn = null;
+    Statement stmt = null;
+
+    String encoding = metricsConf.get(HBASE_ENCODING_SCHEME, DEFAULT_ENCODING);
+    String compression = metricsConf.get(HBASE_COMPRESSION_SCHEME, DEFAULT_TABLE_COMPRESSION);
+    String precisionTtl = metricsConf.get(PRECISION_TABLE_TTL, "86400");
+    String hostMinTtl = metricsConf.get(HOST_MINUTE_TABLE_TTL, "604800");
+    String hostHourTtl = metricsConf.get(HOST_HOUR_TABLE_TTL, "2592000");
+    String clusterMinTtl = metricsConf.get(CLUSTER_MINUTE_TABLE_TTL, "2592000");
+    String clusterHourTtl = metricsConf.get(CLUSTER_HOUR_TABLE_TTL, "31536000");
+
+    try {
+      LOG.info("Initializing metrics schema...");
+      conn = getConnectionRetryingOnException();
+      stmt = conn.createStatement();
+
+      stmt.executeUpdate(String.format(CREATE_METRICS_TABLE_SQL,
+        encoding, precisionTtl, compression));
+      stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL,
+        encoding, hostHourTtl, compression));
+      stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL,
+        encoding, hostMinTtl, compression));
+      stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL,
+        encoding, clusterMinTtl, compression));
+      stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL,
+        encoding, clusterHourTtl, compression));
+      conn.commit();
+    } catch (SQLException sql) {
+      LOG.warn("Error creating Metrics Schema in HBase using Phoenix.", sql);
+      throw new MetricsInitializationException(
+        "Error creating Metrics Schema in HBase using Phoenix.", sql);
+    } catch (InterruptedException e) {
+      LOG.warn("Error creating Metrics Schema in HBase using Phoenix.", e);
+      throw new MetricsInitializationException(
+        "Error creating Metrics Schema in HBase using Phoenix.", e);
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+    }
+  }
+
+  public void insertMetricRecords(TimelineMetrics metrics)
+    throws SQLException, IOException {
+
+    List<TimelineMetric> timelineMetrics = metrics.getMetrics();
+    if (timelineMetrics == null || timelineMetrics.isEmpty()) {
+      LOG.debug("Empty metrics insert request.");
+      return;
+    }
+
+    Connection conn = getConnection();
+    PreparedStatement metricRecordStmt = null;
+    long currentTime = System.currentTimeMillis();
+
+    try {
+      metricRecordStmt = conn.prepareStatement(String.format(
+        UPSERT_METRICS_SQL, METRICS_RECORD_TABLE_NAME));
+
+      for (TimelineMetric metric : timelineMetrics) {
+        metricRecordStmt.clearParameters();
+
+        LOG.trace("host: " + metric.getHostName() + ", " +
+          "metricName = " + metric.getMetricName() + ", " +
+          "values: " + metric.getMetricValues());
+        Aggregator agg = new Aggregator();
+        double[] aggregates =  agg.calculateAggregates(
+          metric.getMetricValues());
+
+        metricRecordStmt.setString(1, metric.getMetricName());
+        metricRecordStmt.setString(2, metric.getHostName());
+        metricRecordStmt.setString(3, metric.getAppId());
+        metricRecordStmt.setString(4, metric.getInstanceId());
+        metricRecordStmt.setLong(5, currentTime);
+        metricRecordStmt.setLong(6, metric.getStartTime());
+        metricRecordStmt.setString(7, metric.getType());
+        metricRecordStmt.setDouble(8, aggregates[0]);
+        metricRecordStmt.setDouble(9, aggregates[1]);
+        metricRecordStmt.setDouble(10, aggregates[2]);
+        metricRecordStmt.setLong(11, (long)aggregates[3]);
+        String json =
+          TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues());
+        metricRecordStmt.setString(12, json);
+
+        try {
+          metricRecordStmt.executeUpdate();
+        } catch (SQLException sql) {
+          LOG.error(sql);
+        }
+      }
+
+      conn.commit();
+
+    } finally {
+      if (metricRecordStmt != null) {
+        try {
+          metricRecordStmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
+      }
+    }
+  }
+
+
+  @SuppressWarnings("unchecked")
+  public TimelineMetrics getMetricRecords(final Condition condition)
+    throws SQLException, IOException {
+
+    if (condition.isEmpty()) {
+      throw new SQLException("No filter criteria specified.");
+    }
+
+    Connection conn = getConnection();
+    PreparedStatement stmt = null;
+    TimelineMetrics metrics = new TimelineMetrics();
+
+    try {
+      stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+
+      ResultSet rs = stmt.executeQuery();
+
+      while (rs.next()) {
+        TimelineMetric metric = getTimelineMetricFromResultSet(rs);
+
+        if (condition.isGrouped()) {
+          metrics.addOrMergeTimelineMetric(metric);
+        } else {
+          metrics.getMetrics().add(metric);
+        }
+      }
+
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
+      }
+    }
+    return metrics;
+  }
+
+  public void saveHostAggregateRecords(Map<TimelineMetric,
+    MetricHostAggregate> hostAggregateMap, String phoenixTableName)
+    throws SQLException {
+
+    if (hostAggregateMap != null && !hostAggregateMap.isEmpty()) {
+      Connection conn = getConnection();
+      PreparedStatement stmt = null;
+
+      long start = System.currentTimeMillis();
+      int rowCount = 0;
+
+      try {
+        stmt = conn.prepareStatement(
+          String.format(UPSERT_AGGREGATE_RECORD_SQL, phoenixTableName));
+
+        for (Map.Entry<TimelineMetric, MetricHostAggregate> metricAggregate :
+          hostAggregateMap.entrySet()) {
+
+          TimelineMetric metric = metricAggregate.getKey();
+          MetricHostAggregate hostAggregate = metricAggregate.getValue();
+
+          rowCount++;
+          stmt.clearParameters();
+          stmt.setString(1, metric.getMetricName());
+          stmt.setString(2, metric.getHostName());
+          stmt.setString(3, metric.getAppId());
+          stmt.setString(4, metric.getInstanceId());
+          stmt.setLong(5, metric.getTimestamp());
+          stmt.setString(6, metric.getType());
+          stmt.setDouble(7, hostAggregate.getSum());
+          stmt.setDouble(8, hostAggregate.getMax());
+          stmt.setDouble(9, hostAggregate.getMin());
+          stmt.setDouble(10, hostAggregate.getNumberOfSamples());
+
+          try {
+            // TODO: Why this exception is swallowed
+            stmt.executeUpdate();
+          } catch (SQLException sql) {
+            LOG.error(sql);
+          }
+
+          if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
+            conn.commit();
+            rowCount = 0;
+          }
+
+        }
+
+        conn.commit();
+
+      } finally {
+        if (stmt != null) {
+          try {
+            stmt.close();
+          } catch (SQLException e) {
+            // Ignore
+          }
+        }
+        if (conn != null) {
+          try {
+            conn.close();
+          } catch (SQLException sql) {
+            // Ignore
+          }
+        }
+      }
+
+      long end = System.currentTimeMillis();
+
+      if ((end - start) > 60000l) {
+        LOG.info("Time to save map: " + (end - start) + ", " +
+          "thread = " + Thread.currentThread().getClass());
+      }
+    }
+  }
+
+  /**
+   * Save Metric aggregate records.
+   *
+   * @throws SQLException
+   */
+  public void saveClusterAggregateRecords(
+    Map<TimelineClusterMetric, MetricClusterAggregate> records)
+    throws SQLException {
+
+      if (records == null || records.isEmpty()) {
+        LOG.debug("Empty aggregate records.");
+        return;
+      }
+
+      long start = System.currentTimeMillis();
+
+      Connection conn = getConnection();
+      PreparedStatement stmt = null;
+      try {
+        stmt = conn.prepareStatement(UPSERT_CLUSTER_AGGREGATE_SQL);
+        int rowCount = 0;
+
+        for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate>
+          aggregateEntry : records.entrySet()) {
+          TimelineClusterMetric clusterMetric = aggregateEntry.getKey();
+          MetricClusterAggregate aggregate = aggregateEntry.getValue();
+
+          LOG.trace("clusterMetric = " + clusterMetric + ", " +
+            "aggregate = " + aggregate);
+
+          rowCount++;
+          stmt.clearParameters();
+          stmt.setString(1, clusterMetric.getMetricName());
+          stmt.setString(2, clusterMetric.getAppId());
+          stmt.setString(3, clusterMetric.getInstanceId());
+          stmt.setLong(4, clusterMetric.getTimestamp());
+          stmt.setString(5, clusterMetric.getType());
+          stmt.setDouble(6, aggregate.getSum());
+          stmt.setInt(7, aggregate.getNumberOfHosts());
+          stmt.setDouble(8, aggregate.getMax());
+          stmt.setDouble(9, aggregate.getMin());
+
+          try {
+            stmt.executeUpdate();
+          } catch (SQLException sql) {
+            // TODO: Why this exception is swallowed
+            LOG.error(sql);
+          }
+
+          if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
+            conn.commit();
+            rowCount = 0;
+          }
+        }
+
+        conn.commit();
+
+      } finally {
+        if (stmt != null) {
+          try {
+            stmt.close();
+          } catch (SQLException e) {
+            // Ignore
+          }
+        }
+        if (conn != null) {
+          try {
+            conn.close();
+          } catch (SQLException sql) {
+            // Ignore
+          }
+        }
+      }
+      long end = System.currentTimeMillis();
+      if ((end - start) > 60000l) {
+        LOG.info("Time to save: " + (end - start) + ", " +
+          "thread = " + Thread.currentThread().getName());
+      }
+    }
+
+  /**
+   * Save Metric aggregate records.
+   *
+   * @throws SQLException
+   */
+  public void saveClusterAggregateHourlyRecords(
+    Map<TimelineClusterMetric, MetricHostAggregate> records,
+    String tableName)
+    throws SQLException {
+    if (records == null || records.isEmpty()) {
+      LOG.debug("Empty aggregate records.");
+      return;
+    }
+
+    long start = System.currentTimeMillis();
+
+    Connection conn = getConnection();
+    PreparedStatement stmt = null;
+    try {
+      stmt = conn.prepareStatement(String.format
+        (UPSERT_CLUSTER_AGGREGATE_TIME_SQL, tableName));
+      int rowCount = 0;
+
+      for (Map.Entry<TimelineClusterMetric, MetricHostAggregate>
+        aggregateEntry : records.entrySet()) {
+        TimelineClusterMetric clusterMetric = aggregateEntry.getKey();
+        MetricHostAggregate aggregate = aggregateEntry.getValue();
+
+        LOG.trace("clusterMetric = " + clusterMetric + ", " +
+          "aggregate = " + aggregate);
+
+        rowCount++;
+        stmt.clearParameters();
+        stmt.setString(1, clusterMetric.getMetricName());
+        stmt.setString(2, clusterMetric.getAppId());
+        stmt.setString(3, clusterMetric.getInstanceId());
+        stmt.setLong(4, clusterMetric.getTimestamp());
+        stmt.setString(5, clusterMetric.getType());
+        stmt.setDouble(6, aggregate.getSum());
+//        stmt.setInt(7, aggregate.getNumberOfHosts());
+        stmt.setLong(7, aggregate.getNumberOfSamples());
+        stmt.setDouble(8, aggregate.getMax());
+        stmt.setDouble(9, aggregate.getMin());
+
+        try {
+          stmt.executeUpdate();
+        } catch (SQLException sql) {
+          // we have no way to verify it works!!!
+          LOG.error(sql);
+        }
+
+        if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
+          conn.commit();
+          rowCount = 0;
+        }
+      }
+
+      conn.commit();
+
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
+      }
+    }
+    long end = System.currentTimeMillis();
+    if ((end - start) > 60000l) {
+      LOG.info("Time to save: " + (end - start) + ", " +
+        "thread = " + Thread.currentThread().getName());
+    }
+  }
+
+
+  public TimelineMetrics getAggregateMetricRecords(final Condition condition)
+    throws SQLException {
+
+    if (condition.isEmpty()) {
+      throw new SQLException("No filter criteria specified.");
+    }
+
+    Connection conn = getConnection();
+    PreparedStatement stmt = null;
+    TimelineMetrics metrics = new TimelineMetrics();
+
+    try {
+      stmt = PhoenixTransactSQL.prepareGetAggregateSqlStmt(conn, condition);
+
+      ResultSet rs = stmt.executeQuery();
+
+      while (rs.next()) {
+        TimelineMetric metric = new TimelineMetric();
+        metric.setMetricName(rs.getString("METRIC_NAME"));
+        metric.setAppId(rs.getString("APP_ID"));
+        metric.setInstanceId(rs.getString("INSTANCE_ID"));
+        metric.setTimestamp(rs.getLong("SERVER_TIME"));
+        metric.setStartTime(rs.getLong("SERVER_TIME"));
+        Map<Long, Double> valueMap = new HashMap<Long, Double>();
+        valueMap.put(rs.getLong("SERVER_TIME"),
+          rs.getDouble("METRIC_SUM") / rs.getInt("HOSTS_COUNT"));
+        metric.setMetricValues(valueMap);
+
+        if (condition.isGrouped()) {
+          metrics.addOrMergeTimelineMetric(metric);
+        } else {
+          metrics.getMetrics().add(metric);
+        }
+      }
+
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
+      }
+    }
+    LOG.info("Aggregate records size: " + metrics.getMetrics().size());
+    return metrics;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
new file mode 100644
index 0000000..0d53f5f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
@@ -0,0 +1,528 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Encapsulate all metrics related SQL queries.
+ */
+public class PhoenixTransactSQL {
+
+  static final Log LOG = LogFactory.getLog(PhoenixTransactSQL.class);
+  // TODO: Configurable TTL values
+  /**
+   * Create table to store individual metric records.
+   */
+  public static final String CREATE_METRICS_TABLE_SQL = "CREATE TABLE IF NOT " +
+    "EXISTS METRIC_RECORD (METRIC_NAME VARCHAR, " +
+    "HOSTNAME VARCHAR, " +
+    "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+    "APP_ID VARCHAR, " +
+    "INSTANCE_ID VARCHAR, " +
+    "START_TIME UNSIGNED_LONG, " +
+    "UNITS CHAR(20), " +
+    "METRIC_SUM DOUBLE, " +
+    "METRIC_COUNT UNSIGNED_INT, " +
+    "METRIC_MAX DOUBLE, " +
+    "METRIC_MIN DOUBLE, " +
+    "METRICS VARCHAR CONSTRAINT pk " +
+    "PRIMARY KEY (METRIC_NAME, HOSTNAME, SERVER_TIME, APP_ID, " +
+    "INSTANCE_ID)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
+    "TTL=%s, COMPRESSION='%s'";
+
+  public static final String CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL =
+    "CREATE TABLE IF NOT EXISTS METRIC_RECORD_HOURLY " +
+      "(METRIC_NAME VARCHAR, " +
+      "HOSTNAME VARCHAR, " +
+      "APP_ID VARCHAR, " +
+      "INSTANCE_ID VARCHAR, " +
+      "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+      "UNITS CHAR(20), " +
+      "METRIC_SUM DOUBLE," +
+      "METRIC_COUNT UNSIGNED_INT, " +
+      "METRIC_MAX DOUBLE," +
+      "METRIC_MIN DOUBLE CONSTRAINT pk " +
+      "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
+      "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
+      "TTL=%s, COMPRESSION='%s'";
+
+  public static final String CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL =
+    "CREATE TABLE IF NOT EXISTS METRIC_RECORD_MINUTE " +
+      "(METRIC_NAME VARCHAR, " +
+      "HOSTNAME VARCHAR, " +
+      "APP_ID VARCHAR, " +
+      "INSTANCE_ID VARCHAR, " +
+      "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+      "UNITS CHAR(20), " +
+      "METRIC_SUM DOUBLE," +
+      "METRIC_COUNT UNSIGNED_INT, " +
+      "METRIC_MAX DOUBLE," +
+      "METRIC_MIN DOUBLE CONSTRAINT pk " +
+      "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
+      "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, TTL=%s," +
+      " COMPRESSION='%s'";
+
+  public static final String CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL =
+    "CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE " +
+      "(METRIC_NAME VARCHAR, " +
+      "APP_ID VARCHAR, " +
+      "INSTANCE_ID VARCHAR, " +
+      "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+      "UNITS CHAR(20), " +
+      "METRIC_SUM DOUBLE, " +
+      "HOSTS_COUNT UNSIGNED_INT, " +
+      "METRIC_MAX DOUBLE, " +
+      "METRIC_MIN DOUBLE " +
+      "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " +
+      "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
+      "TTL=%s, COMPRESSION='%s'";
+
+  public static final String CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL =
+    "CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE_HOURLY " +
+      "(METRIC_NAME VARCHAR, " +
+      "APP_ID VARCHAR, " +
+      "INSTANCE_ID VARCHAR, " +
+      "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+      "UNITS CHAR(20), " +
+      "METRIC_SUM DOUBLE, " +
+      "METRIC_COUNT UNSIGNED_INT, " +
+      "METRIC_MAX DOUBLE, " +
+      "METRIC_MIN DOUBLE " +
+      "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " +
+      "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
+      "TTL=%s, COMPRESSION='%s'";
+
+  /**
+   * Insert into metric records table.
+   */
+  public static final String UPSERT_METRICS_SQL = "UPSERT INTO %s " +
+    "(METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, START_TIME, " +
+    "UNITS, " +
+    "METRIC_SUM, " +
+    "METRIC_MAX, " +
+    "METRIC_MIN, " +
+    "METRIC_COUNT, " +
+    "METRICS) VALUES " +
+    "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+  public static final String UPSERT_CLUSTER_AGGREGATE_SQL = "UPSERT INTO " +
+    "METRIC_AGGREGATE (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, " +
+    "UNITS, " +
+    "METRIC_SUM, " +
+    "HOSTS_COUNT, " +
+    "METRIC_MAX, " +
+    "METRIC_MIN) " +
+    "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+  public static final String UPSERT_CLUSTER_AGGREGATE_TIME_SQL = "UPSERT INTO" +
+    " %s (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, " +
+    "UNITS, " +
+    "METRIC_SUM, " +
+    "METRIC_COUNT, " +
+    "METRIC_MAX, " +
+    "METRIC_MIN) " +
+    "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+
+  public static final String UPSERT_AGGREGATE_RECORD_SQL = "UPSERT INTO " +
+    "%s (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
+    "SERVER_TIME, " +
+    "UNITS, " +
+    "METRIC_SUM, " +
+    "METRIC_MAX, " +
+    "METRIC_MIN," +
+    "METRIC_COUNT) " +
+    "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+  /**
+   * Retrieve a set of rows from metrics records table.
+   */
+  public static final String GET_METRIC_SQL = "SELECT %s METRIC_NAME, " +
+    "HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, START_TIME, UNITS, " +
+    "METRIC_SUM, " +
+    "METRIC_MAX, " +
+    "METRIC_MIN, " +
+    "METRIC_COUNT, " +
+    "METRICS " +
+    "FROM %s";
+
+  public static final String GET_METRIC_AGGREGATE_ONLY_SQL = "SELECT %s " +
+    "METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, " +
+    "UNITS, " +
+    "METRIC_SUM, " +
+    "METRIC_MAX, " +
+    "METRIC_MIN, " +
+    "METRIC_COUNT " +
+    "FROM %s";
+
+  public static final String GET_CLUSTER_AGGREGATE_SQL = "SELECT %s " +
+    "METRIC_NAME, APP_ID, " +
+    "INSTANCE_ID, SERVER_TIME, " +
+    "UNITS, " +
+    "METRIC_SUM, " +
+    "HOSTS_COUNT, " +
+    "METRIC_MAX, " +
+    "METRIC_MIN " +
+    "FROM METRIC_AGGREGATE";
+
+  public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD";
+  public static final String METRICS_AGGREGATE_MINUTE_TABLE_NAME =
+    "METRIC_RECORD_MINUTE";
+  public static final String METRICS_AGGREGATE_HOURLY_TABLE_NAME =
+    "METRIC_RECORD_HOURLY";
+  public static final String METRICS_CLUSTER_AGGREGATE_TABLE_NAME =
+    "METRIC_AGGREGATE";
+  public static final String METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME =
+    "METRIC_AGGREGATE_HOURLY";
+  public static final String DEFAULT_TABLE_COMPRESSION = "SNAPPY";
+  public static final String DEFAULT_ENCODING = "FAST_DIFF";
+  public static final long NATIVE_TIME_RANGE_DELTA = 120000; // 2 minutes
+
+  /** Filter to optimize HBase scan by using file timestamps. This prevents
+   * a full table scan of metric records.
+   * @return Phoenix Hint String
+   */
+  public static String getNaiveTimeRangeHint(Long startTime, Long delta) {
+    return String.format("/*+ NATIVE_TIME_RANGE(%s) */", (startTime - delta));
+  }
+
+  public static PreparedStatement prepareGetMetricsSqlStmt(
+    Connection connection, Condition condition) throws SQLException {
+
+    if (condition.isEmpty()) {
+      throw new IllegalArgumentException("Condition is empty.");
+    }
+    String stmtStr;
+    if (condition.getStatement() != null) {
+      stmtStr = condition.getStatement();
+    } else {
+      stmtStr = String.format(GET_METRIC_SQL,
+        getNaiveTimeRangeHint(condition.getStartTime(), NATIVE_TIME_RANGE_DELTA),
+        METRICS_RECORD_TABLE_NAME);
+    }
+
+    StringBuilder sb = new StringBuilder(stmtStr);
+    sb.append(" WHERE ");
+    sb.append(condition.getConditionClause());
+    String orderByClause = condition.getOrderByClause();
+
+    if (orderByClause != null) {
+      sb.append(orderByClause);
+    } else {
+      sb.append(" ORDER BY METRIC_NAME, SERVER_TIME ");
+    }
+    if (condition.getLimit() != null) {
+      sb.append(" LIMIT ").append(condition.getLimit());
+    }
+
+    LOG.debug("SQL: " + sb.toString() + ", condition: " + condition);
+    PreparedStatement stmt = connection.prepareStatement(sb.toString());
+    int pos = 1;
+    if (condition.getMetricNames() != null) {
+      for (; pos <= condition.getMetricNames().size(); pos++) {
+        LOG.debug("Setting pos: " + pos + ", value = " + condition.getMetricNames().get(pos - 1));
+        stmt.setString(pos, condition.getMetricNames().get(pos - 1));
+      }
+    }
+    if (condition.getHostname() != null) {
+      LOG.debug("Setting pos: " + pos + ", value: " + condition.getHostname());
+      stmt.setString(pos++, condition.getHostname());
+    }
+    // TODO: Upper case all strings on POST
+    if (condition.getAppId() != null) {
+      // TODO: fix case of appId coming from host metrics
+      String appId = condition.getAppId();
+      if (!condition.getAppId().equals("HOST")) {
+        appId = appId.toLowerCase();
+      }
+      LOG.debug("Setting pos: " + pos + ", value: " + appId);
+      stmt.setString(pos++, appId);
+    }
+    if (condition.getInstanceId() != null) {
+      LOG.debug("Setting pos: " + pos + ", value: " + condition.getInstanceId());
+      stmt.setString(pos++, condition.getInstanceId());
+    }
+    if (condition.getStartTime() != null) {
+      LOG.debug("Setting pos: " + pos + ", value: " + condition.getStartTime());
+      stmt.setLong(pos++, condition.getStartTime());
+    }
+    if (condition.getEndTime() != null) {
+      LOG.debug("Setting pos: " + pos + ", value: " + condition.getEndTime());
+      stmt.setLong(pos, condition.getEndTime());
+    }
+    if (condition.getFetchSize() != null) {
+      stmt.setFetchSize(condition.getFetchSize());
+    }
+
+    return stmt;
+  }
+
+
+  public static PreparedStatement prepareGetAggregateSqlStmt(
+    Connection connection, Condition condition) throws SQLException {
+
+    if (condition.isEmpty()) {
+      throw new IllegalArgumentException("Condition is empty.");
+    }
+
+    StringBuilder sb = new StringBuilder(GET_CLUSTER_AGGREGATE_SQL);
+    sb.append(" WHERE ");
+    sb.append(condition.getConditionClause());
+    sb.append(" ORDER BY METRIC_NAME, SERVER_TIME");
+    if (condition.getLimit() != null) {
+      sb.append(" LIMIT ").append(condition.getLimit());
+    }
+
+    LOG.debug("SQL => " + sb.toString() + ", condition => " + condition);
+    PreparedStatement stmt = connection.prepareStatement(sb.toString());
+    int pos = 1;
+    if (condition.getMetricNames() != null) {
+      for (; pos <= condition.getMetricNames().size(); pos++) {
+        stmt.setString(pos, condition.getMetricNames().get(pos - 1));
+      }
+    }
+    // TODO: Upper case all strings on POST
+    if (condition.getAppId() != null) {
+      stmt.setString(pos++, condition.getAppId().toLowerCase());
+    }
+    if (condition.getInstanceId() != null) {
+      stmt.setString(pos++, condition.getInstanceId());
+    }
+    if (condition.getStartTime() != null) {
+      stmt.setLong(pos++, condition.getStartTime());
+    }
+    if (condition.getEndTime() != null) {
+      stmt.setLong(pos, condition.getEndTime());
+    }
+
+    return stmt;
+  }
+
+  static class Condition {
+    List<String> metricNames;
+    String hostname;
+    String appId;
+    String instanceId;
+    Long startTime;
+    Long endTime;
+    Integer limit;
+    boolean grouped;
+    boolean noLimit = false;
+    Integer fetchSize;
+    String statement;
+    Set<String> orderByColumns = new LinkedHashSet<String>();
+
+    Condition(List<String> metricNames, String hostname, String appId,
+              String instanceId, Long startTime, Long endTime, Integer limit,
+              boolean grouped) {
+      this.metricNames = metricNames;
+      this.hostname = hostname;
+      this.appId = appId;
+      this.instanceId = instanceId;
+      this.startTime = startTime;
+      this.endTime = endTime;
+      this.limit = limit;
+      this.grouped = grouped;
+    }
+
+    String getStatement() {
+      return statement;
+    }
+
+    void setStatement(String statement) {
+      this.statement = statement;
+    }
+
+    List<String> getMetricNames() {
+      return metricNames == null || metricNames.isEmpty() ? null : metricNames;
+    }
+
+    String getMetricsClause() {
+      StringBuilder sb = new StringBuilder("(");
+      if (metricNames != null) {
+        for (String name : metricNames) {
+          if (sb.length() != 1) {
+            sb.append(", ");
+          }
+          sb.append("?");
+        }
+        sb.append(")");
+        return sb.toString();
+      } else {
+        return null;
+      }
+    }
+
+    String getConditionClause() {
+      StringBuilder sb = new StringBuilder();
+      boolean appendConjunction = false;
+
+      if (getMetricNames() != null) {
+        sb.append("METRIC_NAME IN ");
+        sb.append(getMetricsClause());
+        appendConjunction = true;
+      }
+      if (appendConjunction) {
+        sb.append(" AND");
+      }
+      appendConjunction = false;
+      if (getHostname() != null) {
+        sb.append(" HOSTNAME = ?");
+        appendConjunction = true;
+      }
+      if (appendConjunction) {
+        sb.append(" AND");
+      }
+      appendConjunction = false;
+      if (getAppId() != null) {
+        sb.append(" APP_ID = ?");
+        appendConjunction = true;
+      }
+      if (appendConjunction) {
+        sb.append(" AND");
+      }
+      appendConjunction = false;
+      if (getInstanceId() != null) {
+        sb.append(" INSTANCE_ID = ?");
+        appendConjunction = true;
+      }
+      if (appendConjunction) {
+        sb.append(" AND");
+      }
+      appendConjunction = false;
+      if (getStartTime() != null) {
+        sb.append(" SERVER_TIME >= ?");
+        appendConjunction = true;
+      }
+      if (appendConjunction) {
+        sb.append(" AND");
+      }
+      if (getEndTime() != null) {
+        sb.append(" SERVER_TIME < ?");
+      }
+      return sb.toString();
+    }
+
+    String getHostname() {
+      return hostname == null || hostname.isEmpty() ? null : hostname;
+    }
+
+    String getAppId() {
+      return appId == null || appId.isEmpty() ? null : appId;
+    }
+
+    String getInstanceId() {
+      return instanceId == null || instanceId.isEmpty() ? null : instanceId;
+    }
+
+    /**
+     * Convert to millis.
+     */
+    Long getStartTime() {
+      if (startTime < 9999999999l) {
+        return startTime * 1000;
+      } else {
+        return startTime;
+      }
+    }
+
+    Long getEndTime() {
+      if (endTime < 9999999999l) {
+        return endTime * 1000;
+      } else {
+        return endTime;
+      }
+    }
+
+    void setNoLimit() {
+      this.noLimit = true;
+    }
+
+    Integer getLimit() {
+      if (noLimit) {
+        return null;
+      }
+      return limit == null ? PhoenixHBaseAccessor.RESULTSET_LIMIT : limit;
+    }
+
+    boolean isGrouped() {
+      return grouped;
+    }
+
+    boolean isEmpty() {
+      return (metricNames == null || metricNames.isEmpty())
+        && (hostname == null || hostname.isEmpty())
+        && (appId == null || appId.isEmpty())
+        && (instanceId == null || instanceId.isEmpty())
+        && startTime == null
+        && endTime == null;
+    }
+
+    Integer getFetchSize() {
+      return fetchSize;
+    }
+
+    void setFetchSize(Integer fetchSize) {
+      this.fetchSize = fetchSize;
+    }
+
+    void addOrderByColumn(String column) {
+      orderByColumns.add(column);
+    }
+
+    String getOrderByClause() {
+      String orderByStr = " ORDER BY ";
+      if (!orderByColumns.isEmpty()) {
+        StringBuilder sb = new StringBuilder(orderByStr);
+        for (String orderByColumn : orderByColumns) {
+          if (sb.length() != orderByStr.length()) {
+            sb.append(", ");
+          }
+          sb.append(orderByColumn);
+        }
+        sb.append(" ");
+        return sb.toString();
+      }
+      return null;
+    }
+
+    @Override
+    public String toString() {
+      return "Condition{" +
+        "metricNames=" + metricNames +
+        ", hostname='" + hostname + '\'' +
+        ", appId='" + appId + '\'' +
+        ", instanceId='" + instanceId + '\'' +
+        ", startTime=" + startTime +
+        ", endTime=" + endTime +
+        ", limit=" + limit +
+        ", grouped=" + grouped +
+        ", orderBy=" + orderByColumns +
+        ", noLimit=" + noLimit +
+        '}';
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java
new file mode 100644
index 0000000..d227993
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+public class TimelineClusterMetric {
+  private String metricName;
+  private String appId;
+  private String instanceId;
+  private long timestamp;
+  private String type;
+
+  TimelineClusterMetric(String metricName, String appId, String instanceId,
+                        long timestamp, String type) {
+    this.metricName = metricName;
+    this.appId = appId;
+    this.instanceId = instanceId;
+    this.timestamp = timestamp;
+    this.type = type;
+  }
+
+  String getMetricName() {
+    return metricName;
+  }
+
+  String getAppId() {
+    return appId;
+  }
+
+  String getInstanceId() {
+    return instanceId;
+  }
+
+  long getTimestamp() {
+    return timestamp;
+  }
+
+  String getType() { return type; }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    TimelineClusterMetric that = (TimelineClusterMetric) o;
+
+    if (timestamp != that.timestamp) return false;
+    if (appId != null ? !appId.equals(that.appId) : that.appId != null)
+      return false;
+    if (instanceId != null ? !instanceId.equals(that.instanceId) : that.instanceId != null)
+      return false;
+    if (!metricName.equals(that.metricName)) return false;
+
+    return true;
+  }
+
+  public boolean equalsExceptTime(TimelineClusterMetric metric) {
+    if (!metricName.equals(metric.metricName)) return false;
+    if (!appId.equals(metric.appId)) return false;
+    if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
+      return false;
+
+    return true;
+  }
+  @Override
+  public int hashCode() {
+    int result = metricName.hashCode();
+    result = 31 * result + (appId != null ? appId.hashCode() : 0);
+    result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
+    result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "TimelineClusterMetric{" +
+      "metricName='" + metricName + '\'' +
+      ", appId='" + appId + '\'' +
+      ", instanceId='" + instanceId + '\'' +
+      ", timestamp=" + timestamp +
+      '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
new file mode 100644
index 0000000..cab154b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
+
+public class TimelineMetricAggregator extends AbstractTimelineAggregator {
+  private static final Log LOG = LogFactory.getLog
+    (TimelineMetricAggregator.class);
+
+  private final String checkpointLocation;
+  private final Long sleepIntervalMillis;
+  private final Integer checkpointCutOffMultiplier;
+  private final String hostAggregatorDisabledParam;
+  private final String tableName;
+  private final String outputTableName;
+  private final Long nativeTimeRangeDelay;
+
+  public TimelineMetricAggregator(PhoenixHBaseAccessor hBaseAccessor,
+                                  Configuration metricsConf,
+                                  String checkpointLocation,
+                                  Long sleepIntervalMillis,
+                                  Integer checkpointCutOffMultiplier,
+                                  String hostAggregatorDisabledParam,
+                                  String tableName,
+                                  String outputTableName,
+                                  Long nativeTimeRangeDelay) {
+    super(hBaseAccessor, metricsConf);
+    this.checkpointLocation = checkpointLocation;
+    this.sleepIntervalMillis = sleepIntervalMillis;
+    this.checkpointCutOffMultiplier = checkpointCutOffMultiplier;
+    this.hostAggregatorDisabledParam = hostAggregatorDisabledParam;
+    this.tableName = tableName;
+    this.outputTableName = outputTableName;
+    this.nativeTimeRangeDelay =  nativeTimeRangeDelay;
+  }
+
+  @Override
+  protected String getCheckpointLocation() {
+    return checkpointLocation;
+  }
+
+  @Override
+  protected void aggregate(ResultSet rs, long startTime, long endTime)
+    throws IOException, SQLException {
+    Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
+      aggregateMetricsFromResultSet(rs);
+
+    LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
+    hBaseAccessor.saveHostAggregateRecords(hostAggregateMap,
+      outputTableName);
+  }
+
+  @Override
+  protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
+    Condition condition = new Condition(null, null, null, null, startTime,
+      endTime, null, true);
+    condition.setNoLimit();
+    condition.setFetchSize(resultsetFetchSize);
+    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
+      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, nativeTimeRangeDelay),
+      tableName));
+    condition.addOrderByColumn("METRIC_NAME");
+    condition.addOrderByColumn("HOSTNAME");
+    condition.addOrderByColumn("APP_ID");
+    condition.addOrderByColumn("INSTANCE_ID");
+    condition.addOrderByColumn("SERVER_TIME");
+    return condition;
+  }
+
+  private Map<TimelineMetric, MetricHostAggregate> aggregateMetricsFromResultSet
+      (ResultSet rs) throws IOException, SQLException {
+    TimelineMetric existingMetric = null;
+    MetricHostAggregate hostAggregate = null;
+    Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
+      new HashMap<TimelineMetric, MetricHostAggregate>();
+
+    while (rs.next()) {
+      TimelineMetric currentMetric =
+        PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
+      MetricHostAggregate currentHostAggregate =
+        PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
+
+      if (existingMetric == null) {
+        // First row
+        existingMetric = currentMetric;
+        hostAggregate = new MetricHostAggregate();
+        hostAggregateMap.put(currentMetric, hostAggregate);
+      }
+
+      if (existingMetric.equalsExceptTime(currentMetric)) {
+        // Recalculate totals with current metric
+        hostAggregate.updateAggregates(currentHostAggregate);
+      } else {
+        // Switched over to a new metric - save existing - create new aggregate
+        hostAggregate = new MetricHostAggregate();
+        hostAggregate.updateAggregates(currentHostAggregate);
+        hostAggregateMap.put(currentMetric, hostAggregate);
+        existingMetric = currentMetric;
+      }
+    }
+    return hostAggregateMap;
+  }
+
+  @Override
+  protected Long getSleepIntervalMillis() {
+    return sleepIntervalMillis;
+  }
+
+  @Override
+  protected Integer getCheckpointCutOffMultiplier() {
+    return checkpointCutOffMultiplier;
+  }
+
+  @Override
+  protected boolean isDisabled() {
+    return metricsConf.getBoolean(hostAggregatorDisabledParam, false);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java
new file mode 100644
index 0000000..8b10079
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.hadoop.conf.Configuration;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_DISABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_DISABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
+
+/**
+ *
+ */
+public class TimelineMetricAggregatorFactory {
+  private static final String MINUTE_AGGREGATE_CHECKPOINT_FILE =
+    "timeline-metrics-host-aggregator-checkpoint";
+  private static final String MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE =
+    "timeline-metrics-host-aggregator-hourly-checkpoint";
+
+  public static TimelineMetricAggregator createTimelineMetricAggregatorMinute
+    (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+
+    String checkpointDir = metricsConf.get(
+      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+    String checkpointLocation = FilenameUtils.concat(checkpointDir,
+      MINUTE_AGGREGATE_CHECKPOINT_FILE);
+    long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+      (HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l));  // 5 mins
+
+    int checkpointCutOffMultiplier = metricsConf.getInt
+      (HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 3);
+    String hostAggregatorDisabledParam = HOST_AGGREGATOR_MINUTE_DISABLED;
+
+    String inputTableName = METRICS_RECORD_TABLE_NAME;
+    String outputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+
+    return new TimelineMetricAggregator(hBaseAccessor, metricsConf,
+      checkpointLocation,
+      sleepIntervalMillis,
+      checkpointCutOffMultiplier,
+      hostAggregatorDisabledParam,
+      inputTableName,
+      outputTableName,
+      120000l);
+  }
+
+  public static TimelineMetricAggregator createTimelineMetricAggregatorHourly
+    (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+
+    String checkpointDir = metricsConf.get(
+      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+    String checkpointLocation = FilenameUtils.concat(checkpointDir,
+      MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE);
+    long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+      (HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l));
+
+    int checkpointCutOffMultiplier = metricsConf.getInt
+      (HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+    String hostAggregatorDisabledParam = HOST_AGGREGATOR_HOUR_DISABLED;
+
+    String inputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+    String outputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+
+    return new TimelineMetricAggregator(hBaseAccessor, metricsConf,
+      checkpointLocation,
+      sleepIntervalMillis,
+      checkpointCutOffMultiplier,
+      hostAggregatorDisabledParam,
+      inputTableName,
+      outputTableName,
+      3600000l);
+  }
+
+
+}