You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2014/12/02 18:28:32 UTC
[16/30] ambari git commit: AMBARI-5707. Replace Ganglia with high
performant and pluggable Metrics System. (swagle)
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java
new file mode 100644
index 0000000..652c492
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+public class DefaultPhoenixDataSource implements ConnectionProvider {
+
+ static final Log LOG = LogFactory.getLog(DefaultPhoenixDataSource.class);
+ private static final String ZOOKEEPER_CLIENT_PORT =
+ "hbase.zookeeper.property.clientPort";
+ private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
+ private static final String ZNODE_PARENT = "zookeeper.znode.parent";
+
+ private static final String connectionUrl = "jdbc:phoenix:%s:%s:%s";
+ private final String url;
+
+ public DefaultPhoenixDataSource(Configuration hbaseConf) {
+ String zookeeperClientPort = hbaseConf.getTrimmed(ZOOKEEPER_CLIENT_PORT,
+ "2181");
+ String zookeeperQuorum = hbaseConf.getTrimmed(ZOOKEEPER_QUORUM);
+ String znodeParent = hbaseConf.getTrimmed(ZNODE_PARENT, "/hbase");
+ if (zookeeperQuorum == null || zookeeperQuorum.isEmpty()) {
+ throw new IllegalStateException("Unable to find Zookeeper quorum to " +
+ "access HBase store using Phoenix.");
+ }
+
+ url = String.format(connectionUrl,
+ zookeeperQuorum,
+ zookeeperClientPort,
+ znodeParent);
+ }
+
+ /**
+ * Get JDBC connection to HBase store. Assumption is that the hbase
+ * configuration is present on the classpath and loaded by the caller into
+ * the Configuration object.
+ * Phoenix already caches the HConnection between the client and HBase
+ * cluster.
+ *
+ * @return @java.sql.Connection
+ */
+ public Connection getConnection() throws SQLException {
+
+ LOG.debug("Metric store connection url: " + url);
+ try {
+ return DriverManager.getConnection(url);
+ } catch (SQLException e) {
+ LOG.warn("Unable to connect to HBase store using Phoenix.", e);
+
+ throw e;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
new file mode 100644
index 0000000..9364187
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+
+import java.io.IOException;
+import java.net.URL;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.TimelineMetricConfiguration.HBASE_SITE_CONFIGURATION_FILE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.TimelineMetricConfiguration.METRICS_SITE_CONFIGURATION_FILE;
+
+public class HBaseTimelineMetricStore extends AbstractService
+ implements TimelineMetricStore {
+
+ static final Log LOG = LogFactory.getLog(HBaseTimelineMetricStore.class);
+ private PhoenixHBaseAccessor hBaseAccessor;
+
+ /**
+ * Construct the service.
+ *
+ */
+ public HBaseTimelineMetricStore() {
+ super(HBaseTimelineMetricStore.class.getName());
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ if (classLoader == null) {
+ classLoader = getClass().getClassLoader();
+ }
+ URL hbaseResUrl = classLoader.getResource(HBASE_SITE_CONFIGURATION_FILE);
+ URL amsResUrl = classLoader.getResource(METRICS_SITE_CONFIGURATION_FILE);
+ LOG.info("Found hbase site configuration: " + hbaseResUrl);
+ LOG.info("Found metric service configuration: " + amsResUrl);
+
+ if (hbaseResUrl == null) {
+ throw new IllegalStateException("Unable to initialize the metrics " +
+ "subsystem. No hbase-site present in the classpath.");
+ }
+
+ if (amsResUrl == null) {
+ throw new IllegalStateException("Unable to initialize the metrics " +
+ "subsystem. No ams-site present in the classpath.");
+ }
+
+ Configuration hbaseConf = new Configuration(true);
+ hbaseConf.addResource(hbaseResUrl.toURI().toURL());
+ Configuration metricsConf = new Configuration(true);
+ metricsConf.addResource(amsResUrl.toURI().toURL());
+
+ initializeSubsystem(hbaseConf, metricsConf);
+ }
+
+ private void initializeSubsystem(Configuration hbaseConf,
+ Configuration metricsConf) {
+ hBaseAccessor = new PhoenixHBaseAccessor(hbaseConf, metricsConf);
+ hBaseAccessor.initMetricSchema();
+
+ // Start the cluster aggregator
+ TimelineMetricClusterAggregator minuteClusterAggregator =
+ new TimelineMetricClusterAggregator(hBaseAccessor, metricsConf);
+ if (!minuteClusterAggregator.isDisabled()) {
+ Thread aggregatorThread = new Thread(minuteClusterAggregator);
+ aggregatorThread.start();
+ }
+
+ // Start the cluster aggregator hourly
+ TimelineMetricClusterAggregatorHourly hourlyClusterAggregator =
+ new TimelineMetricClusterAggregatorHourly(hBaseAccessor, metricsConf);
+ if (!hourlyClusterAggregator.isDisabled()) {
+ Thread aggregatorThread = new Thread(hourlyClusterAggregator);
+ aggregatorThread.start();
+ }
+
+ // Start the 5 minute aggregator
+ TimelineMetricAggregator minuteHostAggregator =
+ TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute
+ (hBaseAccessor, metricsConf);
+ if (!minuteHostAggregator.isDisabled()) {
+ Thread minuteAggregatorThread = new Thread(minuteHostAggregator);
+ minuteAggregatorThread.start();
+ }
+
+ // Start hourly host aggregator
+ TimelineMetricAggregator hourlyHostAggregator =
+ TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly
+ (hBaseAccessor, metricsConf);
+ if (!hourlyHostAggregator.isDisabled()) {
+ Thread aggregatorHourlyThread = new Thread(hourlyHostAggregator);
+ aggregatorHourlyThread.start();
+ }
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ super.serviceStop();
+ }
+
+ //TODO: update to work with HOSTS_COUNT and METRIC_COUNT
+ @Override
+ public TimelineMetrics getTimelineMetrics(List<String> metricNames,
+ String hostname, String applicationId, String instanceId,
+ Long startTime, Long endTime, Integer limit,
+ boolean groupedByHosts) throws SQLException, IOException {
+
+ Condition condition = new Condition(metricNames, hostname, applicationId,
+ instanceId, startTime, endTime, limit, groupedByHosts);
+
+ if (hostname == null) {
+ return hBaseAccessor.getAggregateMetricRecords(condition);
+ }
+
+ return hBaseAccessor.getMetricRecords(condition);
+ }
+
+ @Override
+ public TimelineMetric getTimelineMetric(String metricName, String hostname,
+ String applicationId, String instanceId, Long startTime,
+ Long endTime, Integer limit)
+ throws SQLException, IOException {
+
+ TimelineMetrics metrics = hBaseAccessor.getMetricRecords(
+ new Condition(Collections.singletonList(metricName), hostname,
+ applicationId, instanceId, startTime, endTime, limit, true)
+ );
+
+ TimelineMetric metric = new TimelineMetric();
+ List<TimelineMetric> metricList = metrics.getMetrics();
+
+ if (metricList != null && !metricList.isEmpty()) {
+ metric.setMetricName(metricList.get(0).getMetricName());
+ metric.setAppId(metricList.get(0).getAppId());
+ metric.setInstanceId(metricList.get(0).getInstanceId());
+ metric.setHostName(metricList.get(0).getHostName());
+ // Assumption that metrics are ordered by start time
+ metric.setStartTime(metricList.get(0).getStartTime());
+ Map<Long, Double> metricRecords = new HashMap<Long, Double>();
+ for (TimelineMetric timelineMetric : metricList) {
+ metricRecords.putAll(timelineMetric.getMetricValues());
+ }
+ metric.setMetricValues(metricRecords);
+ }
+
+ return metric;
+ }
+
+
+ @Override
+ public TimelinePutResponse putMetrics(TimelineMetrics metrics)
+ throws SQLException, IOException {
+
+ // Error indicated by the Sql exception
+ TimelinePutResponse response = new TimelinePutResponse();
+
+ hBaseAccessor.insertMetricRecords(metrics);
+
+ return response;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java
new file mode 100644
index 0000000..61e15d7
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.annotate.JsonSubTypes;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.IOException;
+
+/**
+*
+*/
+@JsonSubTypes({@JsonSubTypes.Type(value = MetricClusterAggregate.class),
+ @JsonSubTypes.Type(value = MetricHostAggregate.class)})
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class MetricAggregate {
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ protected Double sum = 0.0;
+ protected Double deviation;
+ protected Double max = Double.MIN_VALUE;
+ protected Double min = Double.MAX_VALUE;
+
+ public MetricAggregate() {
+ }
+
+ MetricAggregate(Double sum, Double deviation, Double max,
+ Double min) {
+ this.sum = sum;
+ this.deviation = deviation;
+ this.max = max;
+ this.min = min;
+ }
+
+ void updateSum(Double sum) {
+ this.sum += sum;
+ }
+
+ void updateMax(Double max) {
+ if (max > this.max) {
+ this.max = max;
+ }
+ }
+
+ void updateMin(Double min) {
+ if (min < this.min) {
+ this.min = min;
+ }
+ }
+
+ @JsonProperty("sum")
+ Double getSum() {
+ return sum;
+ }
+
+ @JsonProperty("deviation")
+ Double getDeviation() {
+ return deviation;
+ }
+
+ @JsonProperty("max")
+ Double getMax() {
+ return max;
+ }
+
+ @JsonProperty("min")
+ Double getMin() {
+ return min;
+ }
+
+ public void setSum(Double sum) {
+ this.sum = sum;
+ }
+
+ public void setDeviation(Double deviation) {
+ this.deviation = deviation;
+ }
+
+ public void setMax(Double max) {
+ this.max = max;
+ }
+
+ public void setMin(Double min) {
+ this.min = min;
+ }
+
+ public String toJSON() throws IOException {
+ return mapper.writeValueAsString(this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java
new file mode 100644
index 0000000..c13c85f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+*
+*/
+public class MetricClusterAggregate extends MetricAggregate {
+ private int numberOfHosts;
+
+ @JsonCreator
+ public MetricClusterAggregate() {
+ }
+
+ MetricClusterAggregate(Double sum, int numberOfHosts, Double deviation,
+ Double max, Double min) {
+ super(sum, deviation, max, min);
+ this.numberOfHosts = numberOfHosts;
+ }
+
+ @JsonProperty("numberOfHosts")
+ int getNumberOfHosts() {
+ return numberOfHosts;
+ }
+
+ void updateNumberOfHosts(int count) {
+ this.numberOfHosts += count;
+ }
+
+ public void setNumberOfHosts(int numberOfHosts) {
+ this.numberOfHosts = numberOfHosts;
+ }
+
+ /**
+ * Find and update min, max and avg for a minute
+ */
+ void updateAggregates(MetricClusterAggregate hostAggregate) {
+ updateMax(hostAggregate.getMax());
+ updateMin(hostAggregate.getMin());
+ updateSum(hostAggregate.getSum());
+ updateNumberOfHosts(hostAggregate.getNumberOfHosts());
+ }
+
+ @Override
+ public String toString() {
+// MetricClusterAggregate
+ return "MetricAggregate{" +
+ "sum=" + sum +
+ ", numberOfHosts=" + numberOfHosts +
+ ", deviation=" + deviation +
+ ", max=" + max +
+ ", min=" + min +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java
new file mode 100644
index 0000000..02cc207
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * Represents a collection of minute based aggregation of values for
+ * resolution greater than a minute.
+ */
+public class MetricHostAggregate extends MetricAggregate {
+
+ private long numberOfSamples = 0;
+
+ @JsonCreator
+ public MetricHostAggregate() {
+ super(0.0, 0.0, Double.MIN_VALUE, Double.MAX_VALUE);
+ }
+
+ public MetricHostAggregate(Double sum, int numberOfSamples,
+ Double deviation,
+ Double max, Double min) {
+ super(sum, deviation, max, min);
+ this.numberOfSamples = numberOfSamples;
+ }
+
+ @JsonProperty("numberOfSamples")
+ long getNumberOfSamples() {
+ return numberOfSamples == 0 ? 1 : numberOfSamples;
+ }
+
+ void updateNumberOfSamples(long count) {
+ this.numberOfSamples += count;
+ }
+
+ public void setNumberOfSamples(long numberOfSamples) {
+ this.numberOfSamples = numberOfSamples;
+ }
+
+ public double getAvg() {
+ return sum / numberOfSamples;
+ }
+
+ /**
+ * Find and update min, max and avg for a minute
+ */
+ void updateAggregates(MetricHostAggregate hostAggregate) {
+ updateMax(hostAggregate.getMax());
+ updateMin(hostAggregate.getMin());
+ updateSum(hostAggregate.getSum());
+ updateNumberOfSamples(hostAggregate.getNumberOfSamples());
+ }
+
+ @Override
+ public String toString() {
+ return "MetricHostAggregate{" +
+ "sum=" + sum +
+ ", numberOfSamples=" + numberOfSamples +
+ ", deviation=" + deviation +
+ ", max=" + max +
+ ", min=" + min +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricsInitializationException.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricsInitializationException.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricsInitializationException.java
new file mode 100644
index 0000000..88a427a
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricsInitializationException.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline;
+
+/**
+ * RuntimeException for initialization of metrics schema. It is RuntimeException
+ * since this is a not recoverable situation, and should be handled by main or
+ * service method followed by shutdown.
+ */
+public class MetricsInitializationException extends RuntimeException {
+ public MetricsInitializationException() {
+ }
+
+ public MetricsInitializationException(String msg) {
+ super(msg);
+ }
+
+ public MetricsInitializationException(Throwable t) {
+ super(t);
+ }
+
+ public MetricsInitializationException(String msg, Throwable t) {
+ super(msg, t);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
new file mode 100644
index 0000000..4f248b7
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -0,0 +1,678 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.hadoop.hbase.util.RetryCounterFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_ENCODING;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_TIME_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_METRICS_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_MINUTE_TABLE_TTL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_MAX_RETRIES;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RESULT_LIMIT;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RETRY_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_ENCODING_SCHEME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_HOUR_TABLE_TTL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_MINUTE_TABLE_TTL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL;
+
+/**
+ * Provides a facade over the Phoenix API to access HBase schema
+ */
+public class PhoenixHBaseAccessor {
+
+ private static final Log LOG = LogFactory.getLog(PhoenixHBaseAccessor.class);
+ private final Configuration hbaseConf;
+ private final Configuration metricsConf;
+ private final RetryCounterFactory retryCounterFactory;
+
+ static final int PHOENIX_MAX_MUTATION_STATE_SIZE = 50000;
+ /**
+ * 4 metrics/min * 60 * 24: Retrieve data for 1 day.
+ */
+ private static final int METRICS_PER_MINUTE = 4;
+ public static int RESULTSET_LIMIT = (int)TimeUnit.DAYS.toMinutes(1) *
+ METRICS_PER_MINUTE;
+ private static ObjectMapper mapper = new ObjectMapper();
+
+ private static TypeReference<Map<Long, Double>> metricValuesTypeRef =
+ new TypeReference<Map<Long, Double>>() {};
+ private final ConnectionProvider dataSource;
+
+ public PhoenixHBaseAccessor(Configuration hbaseConf,
+ Configuration metricsConf){
+ this(hbaseConf, metricsConf, new DefaultPhoenixDataSource(hbaseConf));
+ }
+
+ public PhoenixHBaseAccessor(Configuration hbaseConf,
+ Configuration metricsConf,
+ ConnectionProvider dataSource) {
+ this.hbaseConf = hbaseConf;
+ this.metricsConf = metricsConf;
+ RESULTSET_LIMIT = metricsConf.getInt(GLOBAL_RESULT_LIMIT, 5760);
+ try {
+ Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
+ } catch (ClassNotFoundException e) {
+ LOG.error("Phoenix client jar not found in the classpath.", e);
+ throw new IllegalStateException(e);
+ }
+ this.dataSource = dataSource;
+ this.retryCounterFactory = new RetryCounterFactory(
+ metricsConf.getInt(GLOBAL_MAX_RETRIES, 10),
+ (int) SECONDS.toMillis(metricsConf.getInt(GLOBAL_RETRY_INTERVAL, 5)));
+ }
+
+
+ private Connection getConnectionRetryingOnException()
+ throws SQLException, InterruptedException {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ while (true) {
+ try{
+ return getConnection();
+ } catch (SQLException e) {
+ if(!retryCounter.shouldRetry()){
+ LOG.error("HBaseAccessor getConnection failed after "
+ + retryCounter.getMaxAttempts() + " attempts");
+ throw e;
+ }
+ }
+ retryCounter.sleepUntilNextRetry();
+ }
+ }
+
+
+ /**
+ * Get JDBC connection to HBase store. Assumption is that the hbase
+ * configuration is present on the classpath and loaded by the caller into
+ * the Configuration object.
+ * Phoenix already caches the HConnection between the client and HBase
+ * cluster.
+ *
+ * @return @java.sql.Connection
+ */
+ public Connection getConnection() throws SQLException {
+ return dataSource.getConnection();
+ }
+
+ public static Map readMetricFromJSON(String json) throws IOException {
+ return mapper.readValue(json, metricValuesTypeRef);
+ }
+
+ @SuppressWarnings("unchecked")
+ static TimelineMetric getTimelineMetricFromResultSet(ResultSet rs)
+ throws SQLException, IOException {
+ TimelineMetric metric = new TimelineMetric();
+ metric.setMetricName(rs.getString("METRIC_NAME"));
+ metric.setAppId(rs.getString("APP_ID"));
+ metric.setInstanceId(rs.getString("INSTANCE_ID"));
+ metric.setHostName(rs.getString("HOSTNAME"));
+ metric.setTimestamp(rs.getLong("SERVER_TIME"));
+ metric.setStartTime(rs.getLong("START_TIME"));
+ metric.setType(rs.getString("UNITS"));
+ metric.setMetricValues(
+ (Map<Long, Double>) readMetricFromJSON(rs.getString("METRICS")));
+ return metric;
+ }
+
+ static TimelineMetric getTimelineMetricKeyFromResultSet(ResultSet rs)
+ throws SQLException, IOException {
+ TimelineMetric metric = new TimelineMetric();
+ metric.setMetricName(rs.getString("METRIC_NAME"));
+ metric.setAppId(rs.getString("APP_ID"));
+ metric.setInstanceId(rs.getString("INSTANCE_ID"));
+ metric.setHostName(rs.getString("HOSTNAME"));
+ metric.setTimestamp(rs.getLong("SERVER_TIME"));
+ metric.setType(rs.getString("UNITS"));
+ return metric;
+ }
+
+ static MetricHostAggregate getMetricHostAggregateFromResultSet(ResultSet rs)
+ throws SQLException {
+ MetricHostAggregate metricHostAggregate = new MetricHostAggregate();
+ metricHostAggregate.setSum(rs.getDouble("METRIC_SUM"));
+ metricHostAggregate.setMax(rs.getDouble("METRIC_MAX"));
+ metricHostAggregate.setMin(rs.getDouble("METRIC_MIN"));
+ metricHostAggregate.setNumberOfSamples(rs.getLong("METRIC_COUNT"));
+
+ metricHostAggregate.setDeviation(0.0);
+ return metricHostAggregate;
+ }
+
+ static TimelineClusterMetric
+ getTimelineMetricClusterKeyFromResultSet(ResultSet rs)
+ throws SQLException, IOException {
+ TimelineClusterMetric metric = new TimelineClusterMetric(
+ rs.getString("METRIC_NAME"),
+ rs.getString("APP_ID"),
+ rs.getString("INSTANCE_ID"),
+ rs.getLong("SERVER_TIME"),
+ rs.getString("UNITS"));
+
+ return metric;
+ }
+
+ static MetricClusterAggregate
+ getMetricClusterAggregateFromResultSet(ResultSet rs)
+ throws SQLException {
+ MetricClusterAggregate agg = new MetricClusterAggregate();
+ agg.setSum(rs.getDouble("METRIC_SUM"));
+ agg.setMax(rs.getDouble("METRIC_MAX"));
+ agg.setMin(rs.getDouble("METRIC_MIN"));
+ agg.setNumberOfHosts(rs.getInt("HOSTS_COUNT"));
+
+ agg.setDeviation(0.0);
+
+ return agg;
+ }
+
+ protected void initMetricSchema() {
+ Connection conn = null;
+ Statement stmt = null;
+
+ String encoding = metricsConf.get(HBASE_ENCODING_SCHEME, DEFAULT_ENCODING);
+ String compression = metricsConf.get(HBASE_COMPRESSION_SCHEME, DEFAULT_TABLE_COMPRESSION);
+ String precisionTtl = metricsConf.get(PRECISION_TABLE_TTL, "86400");
+ String hostMinTtl = metricsConf.get(HOST_MINUTE_TABLE_TTL, "604800");
+ String hostHourTtl = metricsConf.get(HOST_HOUR_TABLE_TTL, "2592000");
+ String clusterMinTtl = metricsConf.get(CLUSTER_MINUTE_TABLE_TTL, "2592000");
+ String clusterHourTtl = metricsConf.get(CLUSTER_HOUR_TABLE_TTL, "31536000");
+
+ try {
+ LOG.info("Initializing metrics schema...");
+ conn = getConnectionRetryingOnException();
+ stmt = conn.createStatement();
+
+ stmt.executeUpdate(String.format(CREATE_METRICS_TABLE_SQL,
+ encoding, precisionTtl, compression));
+ stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL,
+ encoding, hostHourTtl, compression));
+ stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL,
+ encoding, hostMinTtl, compression));
+ stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL,
+ encoding, clusterMinTtl, compression));
+ stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL,
+ encoding, clusterHourTtl, compression));
+ conn.commit();
+ } catch (SQLException sql) {
+ LOG.warn("Error creating Metrics Schema in HBase using Phoenix.", sql);
+ throw new MetricsInitializationException(
+ "Error creating Metrics Schema in HBase using Phoenix.", sql);
+ } catch (InterruptedException e) {
+ LOG.warn("Error creating Metrics Schema in HBase using Phoenix.", e);
+ throw new MetricsInitializationException(
+ "Error creating Metrics Schema in HBase using Phoenix.", e);
+ } finally {
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ }
+ }
+
+ public void insertMetricRecords(TimelineMetrics metrics)
+ throws SQLException, IOException {
+
+ List<TimelineMetric> timelineMetrics = metrics.getMetrics();
+ if (timelineMetrics == null || timelineMetrics.isEmpty()) {
+ LOG.debug("Empty metrics insert request.");
+ return;
+ }
+
+ Connection conn = getConnection();
+ PreparedStatement metricRecordStmt = null;
+ long currentTime = System.currentTimeMillis();
+
+ try {
+ metricRecordStmt = conn.prepareStatement(String.format(
+ UPSERT_METRICS_SQL, METRICS_RECORD_TABLE_NAME));
+
+ for (TimelineMetric metric : timelineMetrics) {
+ metricRecordStmt.clearParameters();
+
+ LOG.trace("host: " + metric.getHostName() + ", " +
+ "metricName = " + metric.getMetricName() + ", " +
+ "values: " + metric.getMetricValues());
+ Aggregator agg = new Aggregator();
+ double[] aggregates = agg.calculateAggregates(
+ metric.getMetricValues());
+
+ metricRecordStmt.setString(1, metric.getMetricName());
+ metricRecordStmt.setString(2, metric.getHostName());
+ metricRecordStmt.setString(3, metric.getAppId());
+ metricRecordStmt.setString(4, metric.getInstanceId());
+ metricRecordStmt.setLong(5, currentTime);
+ metricRecordStmt.setLong(6, metric.getStartTime());
+ metricRecordStmt.setString(7, metric.getType());
+ metricRecordStmt.setDouble(8, aggregates[0]);
+ metricRecordStmt.setDouble(9, aggregates[1]);
+ metricRecordStmt.setDouble(10, aggregates[2]);
+ metricRecordStmt.setLong(11, (long)aggregates[3]);
+ String json =
+ TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues());
+ metricRecordStmt.setString(12, json);
+
+ try {
+ metricRecordStmt.executeUpdate();
+ } catch (SQLException sql) {
+ LOG.error(sql);
+ }
+ }
+
+ conn.commit();
+
+ } finally {
+ if (metricRecordStmt != null) {
+ try {
+ metricRecordStmt.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException sql) {
+ // Ignore
+ }
+ }
+ }
+ }
+
+
+ @SuppressWarnings("unchecked")
+ public TimelineMetrics getMetricRecords(final Condition condition)
+ throws SQLException, IOException {
+
+ if (condition.isEmpty()) {
+ throw new SQLException("No filter criteria specified.");
+ }
+
+ Connection conn = getConnection();
+ PreparedStatement stmt = null;
+ TimelineMetrics metrics = new TimelineMetrics();
+
+ try {
+ stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+
+ ResultSet rs = stmt.executeQuery();
+
+ while (rs.next()) {
+ TimelineMetric metric = getTimelineMetricFromResultSet(rs);
+
+ if (condition.isGrouped()) {
+ metrics.addOrMergeTimelineMetric(metric);
+ } else {
+ metrics.getMetrics().add(metric);
+ }
+ }
+
+ } finally {
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException sql) {
+ // Ignore
+ }
+ }
+ }
+ return metrics;
+ }
+
+ public void saveHostAggregateRecords(Map<TimelineMetric,
+ MetricHostAggregate> hostAggregateMap, String phoenixTableName)
+ throws SQLException {
+
+ if (hostAggregateMap != null && !hostAggregateMap.isEmpty()) {
+ Connection conn = getConnection();
+ PreparedStatement stmt = null;
+
+ long start = System.currentTimeMillis();
+ int rowCount = 0;
+
+ try {
+ stmt = conn.prepareStatement(
+ String.format(UPSERT_AGGREGATE_RECORD_SQL, phoenixTableName));
+
+ for (Map.Entry<TimelineMetric, MetricHostAggregate> metricAggregate :
+ hostAggregateMap.entrySet()) {
+
+ TimelineMetric metric = metricAggregate.getKey();
+ MetricHostAggregate hostAggregate = metricAggregate.getValue();
+
+ rowCount++;
+ stmt.clearParameters();
+ stmt.setString(1, metric.getMetricName());
+ stmt.setString(2, metric.getHostName());
+ stmt.setString(3, metric.getAppId());
+ stmt.setString(4, metric.getInstanceId());
+ stmt.setLong(5, metric.getTimestamp());
+ stmt.setString(6, metric.getType());
+ stmt.setDouble(7, hostAggregate.getSum());
+ stmt.setDouble(8, hostAggregate.getMax());
+ stmt.setDouble(9, hostAggregate.getMin());
+ stmt.setDouble(10, hostAggregate.getNumberOfSamples());
+
+ try {
+ // TODO: Why this exception is swallowed
+ stmt.executeUpdate();
+ } catch (SQLException sql) {
+ LOG.error(sql);
+ }
+
+ if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
+ conn.commit();
+ rowCount = 0;
+ }
+
+ }
+
+ conn.commit();
+
+ } finally {
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException sql) {
+ // Ignore
+ }
+ }
+ }
+
+ long end = System.currentTimeMillis();
+
+ if ((end - start) > 60000l) {
+ LOG.info("Time to save map: " + (end - start) + ", " +
+ "thread = " + Thread.currentThread().getClass());
+ }
+ }
+ }
+
+ /**
+ * Save Metric aggregate records.
+ *
+ * @throws SQLException
+ */
+ public void saveClusterAggregateRecords(
+ Map<TimelineClusterMetric, MetricClusterAggregate> records)
+ throws SQLException {
+
+ if (records == null || records.isEmpty()) {
+ LOG.debug("Empty aggregate records.");
+ return;
+ }
+
+ long start = System.currentTimeMillis();
+
+ Connection conn = getConnection();
+ PreparedStatement stmt = null;
+ try {
+ stmt = conn.prepareStatement(UPSERT_CLUSTER_AGGREGATE_SQL);
+ int rowCount = 0;
+
+ for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate>
+ aggregateEntry : records.entrySet()) {
+ TimelineClusterMetric clusterMetric = aggregateEntry.getKey();
+ MetricClusterAggregate aggregate = aggregateEntry.getValue();
+
+ LOG.trace("clusterMetric = " + clusterMetric + ", " +
+ "aggregate = " + aggregate);
+
+ rowCount++;
+ stmt.clearParameters();
+ stmt.setString(1, clusterMetric.getMetricName());
+ stmt.setString(2, clusterMetric.getAppId());
+ stmt.setString(3, clusterMetric.getInstanceId());
+ stmt.setLong(4, clusterMetric.getTimestamp());
+ stmt.setString(5, clusterMetric.getType());
+ stmt.setDouble(6, aggregate.getSum());
+ stmt.setInt(7, aggregate.getNumberOfHosts());
+ stmt.setDouble(8, aggregate.getMax());
+ stmt.setDouble(9, aggregate.getMin());
+
+ try {
+ stmt.executeUpdate();
+ } catch (SQLException sql) {
+ // TODO: Why this exception is swallowed
+ LOG.error(sql);
+ }
+
+ if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
+ conn.commit();
+ rowCount = 0;
+ }
+ }
+
+ conn.commit();
+
+ } finally {
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException sql) {
+ // Ignore
+ }
+ }
+ }
+ long end = System.currentTimeMillis();
+ if ((end - start) > 60000l) {
+ LOG.info("Time to save: " + (end - start) + ", " +
+ "thread = " + Thread.currentThread().getName());
+ }
+ }
+
+ /**
+ * Save Metric aggregate records.
+ *
+ * @throws SQLException
+ */
+ public void saveClusterAggregateHourlyRecords(
+ Map<TimelineClusterMetric, MetricHostAggregate> records,
+ String tableName)
+ throws SQLException {
+ if (records == null || records.isEmpty()) {
+ LOG.debug("Empty aggregate records.");
+ return;
+ }
+
+ long start = System.currentTimeMillis();
+
+ Connection conn = getConnection();
+ PreparedStatement stmt = null;
+ try {
+ stmt = conn.prepareStatement(String.format
+ (UPSERT_CLUSTER_AGGREGATE_TIME_SQL, tableName));
+ int rowCount = 0;
+
+ for (Map.Entry<TimelineClusterMetric, MetricHostAggregate>
+ aggregateEntry : records.entrySet()) {
+ TimelineClusterMetric clusterMetric = aggregateEntry.getKey();
+ MetricHostAggregate aggregate = aggregateEntry.getValue();
+
+ LOG.trace("clusterMetric = " + clusterMetric + ", " +
+ "aggregate = " + aggregate);
+
+ rowCount++;
+ stmt.clearParameters();
+ stmt.setString(1, clusterMetric.getMetricName());
+ stmt.setString(2, clusterMetric.getAppId());
+ stmt.setString(3, clusterMetric.getInstanceId());
+ stmt.setLong(4, clusterMetric.getTimestamp());
+ stmt.setString(5, clusterMetric.getType());
+ stmt.setDouble(6, aggregate.getSum());
+// stmt.setInt(7, aggregate.getNumberOfHosts());
+ stmt.setLong(7, aggregate.getNumberOfSamples());
+ stmt.setDouble(8, aggregate.getMax());
+ stmt.setDouble(9, aggregate.getMin());
+
+ try {
+ stmt.executeUpdate();
+ } catch (SQLException sql) {
+ // we have no way to verify it works!!!
+ LOG.error(sql);
+ }
+
+ if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
+ conn.commit();
+ rowCount = 0;
+ }
+ }
+
+ conn.commit();
+
+ } finally {
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException sql) {
+ // Ignore
+ }
+ }
+ }
+ long end = System.currentTimeMillis();
+ if ((end - start) > 60000l) {
+ LOG.info("Time to save: " + (end - start) + ", " +
+ "thread = " + Thread.currentThread().getName());
+ }
+ }
+
+
+ public TimelineMetrics getAggregateMetricRecords(final Condition condition)
+ throws SQLException {
+
+ if (condition.isEmpty()) {
+ throw new SQLException("No filter criteria specified.");
+ }
+
+ Connection conn = getConnection();
+ PreparedStatement stmt = null;
+ TimelineMetrics metrics = new TimelineMetrics();
+
+ try {
+ stmt = PhoenixTransactSQL.prepareGetAggregateSqlStmt(conn, condition);
+
+ ResultSet rs = stmt.executeQuery();
+
+ while (rs.next()) {
+ TimelineMetric metric = new TimelineMetric();
+ metric.setMetricName(rs.getString("METRIC_NAME"));
+ metric.setAppId(rs.getString("APP_ID"));
+ metric.setInstanceId(rs.getString("INSTANCE_ID"));
+ metric.setTimestamp(rs.getLong("SERVER_TIME"));
+ metric.setStartTime(rs.getLong("SERVER_TIME"));
+ Map<Long, Double> valueMap = new HashMap<Long, Double>();
+ valueMap.put(rs.getLong("SERVER_TIME"),
+ rs.getDouble("METRIC_SUM") / rs.getInt("HOSTS_COUNT"));
+ metric.setMetricValues(valueMap);
+
+ if (condition.isGrouped()) {
+ metrics.addOrMergeTimelineMetric(metric);
+ } else {
+ metrics.getMetrics().add(metric);
+ }
+ }
+
+ } finally {
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException sql) {
+ // Ignore
+ }
+ }
+ }
+ LOG.info("Aggregate records size: " + metrics.getMetrics().size());
+ return metrics;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
new file mode 100644
index 0000000..0d53f5f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
@@ -0,0 +1,528 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Encapsulate all metrics related SQL queries.
+ */
+public class PhoenixTransactSQL {
+
+ static final Log LOG = LogFactory.getLog(PhoenixTransactSQL.class);
+ // TODO: Configurable TTL values
+ /**
+ * Create table to store individual metric records.
+ */
+ public static final String CREATE_METRICS_TABLE_SQL = "CREATE TABLE IF NOT " +
+ "EXISTS METRIC_RECORD (METRIC_NAME VARCHAR, " +
+ "HOSTNAME VARCHAR, " +
+ "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+ "APP_ID VARCHAR, " +
+ "INSTANCE_ID VARCHAR, " +
+ "START_TIME UNSIGNED_LONG, " +
+ "UNITS CHAR(20), " +
+ "METRIC_SUM DOUBLE, " +
+ "METRIC_COUNT UNSIGNED_INT, " +
+ "METRIC_MAX DOUBLE, " +
+ "METRIC_MIN DOUBLE, " +
+ "METRICS VARCHAR CONSTRAINT pk " +
+ "PRIMARY KEY (METRIC_NAME, HOSTNAME, SERVER_TIME, APP_ID, " +
+ "INSTANCE_ID)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
+ "TTL=%s, COMPRESSION='%s'";
+
+ public static final String CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL =
+ "CREATE TABLE IF NOT EXISTS METRIC_RECORD_HOURLY " +
+ "(METRIC_NAME VARCHAR, " +
+ "HOSTNAME VARCHAR, " +
+ "APP_ID VARCHAR, " +
+ "INSTANCE_ID VARCHAR, " +
+ "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+ "UNITS CHAR(20), " +
+ "METRIC_SUM DOUBLE," +
+ "METRIC_COUNT UNSIGNED_INT, " +
+ "METRIC_MAX DOUBLE," +
+ "METRIC_MIN DOUBLE CONSTRAINT pk " +
+ "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
+ "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
+ "TTL=%s, COMPRESSION='%s'";
+
+ public static final String CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL =
+ "CREATE TABLE IF NOT EXISTS METRIC_RECORD_MINUTE " +
+ "(METRIC_NAME VARCHAR, " +
+ "HOSTNAME VARCHAR, " +
+ "APP_ID VARCHAR, " +
+ "INSTANCE_ID VARCHAR, " +
+ "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+ "UNITS CHAR(20), " +
+ "METRIC_SUM DOUBLE," +
+ "METRIC_COUNT UNSIGNED_INT, " +
+ "METRIC_MAX DOUBLE," +
+ "METRIC_MIN DOUBLE CONSTRAINT pk " +
+ "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
+ "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, TTL=%s," +
+ " COMPRESSION='%s'";
+
+ public static final String CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL =
+ "CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE " +
+ "(METRIC_NAME VARCHAR, " +
+ "APP_ID VARCHAR, " +
+ "INSTANCE_ID VARCHAR, " +
+ "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+ "UNITS CHAR(20), " +
+ "METRIC_SUM DOUBLE, " +
+ "HOSTS_COUNT UNSIGNED_INT, " +
+ "METRIC_MAX DOUBLE, " +
+ "METRIC_MIN DOUBLE " +
+ "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " +
+ "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
+ "TTL=%s, COMPRESSION='%s'";
+
+ public static final String CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL =
+ "CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE_HOURLY " +
+ "(METRIC_NAME VARCHAR, " +
+ "APP_ID VARCHAR, " +
+ "INSTANCE_ID VARCHAR, " +
+ "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+ "UNITS CHAR(20), " +
+ "METRIC_SUM DOUBLE, " +
+ "METRIC_COUNT UNSIGNED_INT, " +
+ "METRIC_MAX DOUBLE, " +
+ "METRIC_MIN DOUBLE " +
+ "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " +
+ "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
+ "TTL=%s, COMPRESSION='%s'";
+
+ /**
+ * Insert into metric records table.
+ */
+ public static final String UPSERT_METRICS_SQL = "UPSERT INTO %s " +
+ "(METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, START_TIME, " +
+ "UNITS, " +
+ "METRIC_SUM, " +
+ "METRIC_MAX, " +
+ "METRIC_MIN, " +
+ "METRIC_COUNT, " +
+ "METRICS) VALUES " +
+ "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+ public static final String UPSERT_CLUSTER_AGGREGATE_SQL = "UPSERT INTO " +
+ "METRIC_AGGREGATE (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, " +
+ "UNITS, " +
+ "METRIC_SUM, " +
+ "HOSTS_COUNT, " +
+ "METRIC_MAX, " +
+ "METRIC_MIN) " +
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+ public static final String UPSERT_CLUSTER_AGGREGATE_TIME_SQL = "UPSERT INTO" +
+ " %s (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, " +
+ "UNITS, " +
+ "METRIC_SUM, " +
+ "METRIC_COUNT, " +
+ "METRIC_MAX, " +
+ "METRIC_MIN) " +
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+
+ public static final String UPSERT_AGGREGATE_RECORD_SQL = "UPSERT INTO " +
+ "%s (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
+ "SERVER_TIME, " +
+ "UNITS, " +
+ "METRIC_SUM, " +
+ "METRIC_MAX, " +
+ "METRIC_MIN," +
+ "METRIC_COUNT) " +
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+ /**
+ * Retrieve a set of rows from metrics records table.
+ */
+ public static final String GET_METRIC_SQL = "SELECT %s METRIC_NAME, " +
+ "HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, START_TIME, UNITS, " +
+ "METRIC_SUM, " +
+ "METRIC_MAX, " +
+ "METRIC_MIN, " +
+ "METRIC_COUNT, " +
+ "METRICS " +
+ "FROM %s";
+
+ public static final String GET_METRIC_AGGREGATE_ONLY_SQL = "SELECT %s " +
+ "METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, " +
+ "UNITS, " +
+ "METRIC_SUM, " +
+ "METRIC_MAX, " +
+ "METRIC_MIN, " +
+ "METRIC_COUNT " +
+ "FROM %s";
+
+ public static final String GET_CLUSTER_AGGREGATE_SQL = "SELECT %s " +
+ "METRIC_NAME, APP_ID, " +
+ "INSTANCE_ID, SERVER_TIME, " +
+ "UNITS, " +
+ "METRIC_SUM, " +
+ "HOSTS_COUNT, " +
+ "METRIC_MAX, " +
+ "METRIC_MIN " +
+ "FROM METRIC_AGGREGATE";
+
+ public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD";
+ public static final String METRICS_AGGREGATE_MINUTE_TABLE_NAME =
+ "METRIC_RECORD_MINUTE";
+ public static final String METRICS_AGGREGATE_HOURLY_TABLE_NAME =
+ "METRIC_RECORD_HOURLY";
+ public static final String METRICS_CLUSTER_AGGREGATE_TABLE_NAME =
+ "METRIC_AGGREGATE";
+ public static final String METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME =
+ "METRIC_AGGREGATE_HOURLY";
+ public static final String DEFAULT_TABLE_COMPRESSION = "SNAPPY";
+ public static final String DEFAULT_ENCODING = "FAST_DIFF";
+ public static final long NATIVE_TIME_RANGE_DELTA = 120000; // 2 minutes
+
+ /** Filter to optimize HBase scan by using file timestamps. This prevents
+ * a full table scan of metric records.
+ * @return Phoenix Hint String
+ */
+ public static String getNaiveTimeRangeHint(Long startTime, Long delta) {
+ return String.format("/*+ NATIVE_TIME_RANGE(%s) */", (startTime - delta));
+ }
+
+ public static PreparedStatement prepareGetMetricsSqlStmt(
+ Connection connection, Condition condition) throws SQLException {
+
+ if (condition.isEmpty()) {
+ throw new IllegalArgumentException("Condition is empty.");
+ }
+ String stmtStr;
+ if (condition.getStatement() != null) {
+ stmtStr = condition.getStatement();
+ } else {
+ stmtStr = String.format(GET_METRIC_SQL,
+ getNaiveTimeRangeHint(condition.getStartTime(), NATIVE_TIME_RANGE_DELTA),
+ METRICS_RECORD_TABLE_NAME);
+ }
+
+ StringBuilder sb = new StringBuilder(stmtStr);
+ sb.append(" WHERE ");
+ sb.append(condition.getConditionClause());
+ String orderByClause = condition.getOrderByClause();
+
+ if (orderByClause != null) {
+ sb.append(orderByClause);
+ } else {
+ sb.append(" ORDER BY METRIC_NAME, SERVER_TIME ");
+ }
+ if (condition.getLimit() != null) {
+ sb.append(" LIMIT ").append(condition.getLimit());
+ }
+
+ LOG.debug("SQL: " + sb.toString() + ", condition: " + condition);
+ PreparedStatement stmt = connection.prepareStatement(sb.toString());
+ int pos = 1;
+ if (condition.getMetricNames() != null) {
+ for (; pos <= condition.getMetricNames().size(); pos++) {
+ LOG.debug("Setting pos: " + pos + ", value = " + condition.getMetricNames().get(pos - 1));
+ stmt.setString(pos, condition.getMetricNames().get(pos - 1));
+ }
+ }
+ if (condition.getHostname() != null) {
+ LOG.debug("Setting pos: " + pos + ", value: " + condition.getHostname());
+ stmt.setString(pos++, condition.getHostname());
+ }
+ // TODO: Upper case all strings on POST
+ if (condition.getAppId() != null) {
+ // TODO: fix case of appId coming from host metrics
+ String appId = condition.getAppId();
+ if (!condition.getAppId().equals("HOST")) {
+ appId = appId.toLowerCase();
+ }
+ LOG.debug("Setting pos: " + pos + ", value: " + appId);
+ stmt.setString(pos++, appId);
+ }
+ if (condition.getInstanceId() != null) {
+ LOG.debug("Setting pos: " + pos + ", value: " + condition.getInstanceId());
+ stmt.setString(pos++, condition.getInstanceId());
+ }
+ if (condition.getStartTime() != null) {
+ LOG.debug("Setting pos: " + pos + ", value: " + condition.getStartTime());
+ stmt.setLong(pos++, condition.getStartTime());
+ }
+ if (condition.getEndTime() != null) {
+ LOG.debug("Setting pos: " + pos + ", value: " + condition.getEndTime());
+ stmt.setLong(pos, condition.getEndTime());
+ }
+ if (condition.getFetchSize() != null) {
+ stmt.setFetchSize(condition.getFetchSize());
+ }
+
+ return stmt;
+ }
+
+
+ public static PreparedStatement prepareGetAggregateSqlStmt(
+ Connection connection, Condition condition) throws SQLException {
+
+ if (condition.isEmpty()) {
+ throw new IllegalArgumentException("Condition is empty.");
+ }
+
+ StringBuilder sb = new StringBuilder(GET_CLUSTER_AGGREGATE_SQL);
+ sb.append(" WHERE ");
+ sb.append(condition.getConditionClause());
+ sb.append(" ORDER BY METRIC_NAME, SERVER_TIME");
+ if (condition.getLimit() != null) {
+ sb.append(" LIMIT ").append(condition.getLimit());
+ }
+
+ LOG.debug("SQL => " + sb.toString() + ", condition => " + condition);
+ PreparedStatement stmt = connection.prepareStatement(sb.toString());
+ int pos = 1;
+ if (condition.getMetricNames() != null) {
+ for (; pos <= condition.getMetricNames().size(); pos++) {
+ stmt.setString(pos, condition.getMetricNames().get(pos - 1));
+ }
+ }
+ // TODO: Upper case all strings on POST
+ if (condition.getAppId() != null) {
+ stmt.setString(pos++, condition.getAppId().toLowerCase());
+ }
+ if (condition.getInstanceId() != null) {
+ stmt.setString(pos++, condition.getInstanceId());
+ }
+ if (condition.getStartTime() != null) {
+ stmt.setLong(pos++, condition.getStartTime());
+ }
+ if (condition.getEndTime() != null) {
+ stmt.setLong(pos, condition.getEndTime());
+ }
+
+ return stmt;
+ }
+
+ static class Condition {
+ List<String> metricNames;
+ String hostname;
+ String appId;
+ String instanceId;
+ Long startTime;
+ Long endTime;
+ Integer limit;
+ boolean grouped;
+ boolean noLimit = false;
+ Integer fetchSize;
+ String statement;
+ Set<String> orderByColumns = new LinkedHashSet<String>();
+
+ Condition(List<String> metricNames, String hostname, String appId,
+ String instanceId, Long startTime, Long endTime, Integer limit,
+ boolean grouped) {
+ this.metricNames = metricNames;
+ this.hostname = hostname;
+ this.appId = appId;
+ this.instanceId = instanceId;
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.limit = limit;
+ this.grouped = grouped;
+ }
+
+ String getStatement() {
+ return statement;
+ }
+
+ void setStatement(String statement) {
+ this.statement = statement;
+ }
+
+ List<String> getMetricNames() {
+ return metricNames == null || metricNames.isEmpty() ? null : metricNames;
+ }
+
+ String getMetricsClause() {
+ StringBuilder sb = new StringBuilder("(");
+ if (metricNames != null) {
+ for (String name : metricNames) {
+ if (sb.length() != 1) {
+ sb.append(", ");
+ }
+ sb.append("?");
+ }
+ sb.append(")");
+ return sb.toString();
+ } else {
+ return null;
+ }
+ }
+
+ String getConditionClause() {
+ StringBuilder sb = new StringBuilder();
+ boolean appendConjunction = false;
+
+ if (getMetricNames() != null) {
+ sb.append("METRIC_NAME IN ");
+ sb.append(getMetricsClause());
+ appendConjunction = true;
+ }
+ if (appendConjunction) {
+ sb.append(" AND");
+ }
+ appendConjunction = false;
+ if (getHostname() != null) {
+ sb.append(" HOSTNAME = ?");
+ appendConjunction = true;
+ }
+ if (appendConjunction) {
+ sb.append(" AND");
+ }
+ appendConjunction = false;
+ if (getAppId() != null) {
+ sb.append(" APP_ID = ?");
+ appendConjunction = true;
+ }
+ if (appendConjunction) {
+ sb.append(" AND");
+ }
+ appendConjunction = false;
+ if (getInstanceId() != null) {
+ sb.append(" INSTANCE_ID = ?");
+ appendConjunction = true;
+ }
+ if (appendConjunction) {
+ sb.append(" AND");
+ }
+ appendConjunction = false;
+ if (getStartTime() != null) {
+ sb.append(" SERVER_TIME >= ?");
+ appendConjunction = true;
+ }
+ if (appendConjunction) {
+ sb.append(" AND");
+ }
+ if (getEndTime() != null) {
+ sb.append(" SERVER_TIME < ?");
+ }
+ return sb.toString();
+ }
+
+ String getHostname() {
+ return hostname == null || hostname.isEmpty() ? null : hostname;
+ }
+
+ String getAppId() {
+ return appId == null || appId.isEmpty() ? null : appId;
+ }
+
+ String getInstanceId() {
+ return instanceId == null || instanceId.isEmpty() ? null : instanceId;
+ }
+
+ /**
+ * Convert to millis.
+ */
+ Long getStartTime() {
+ if (startTime < 9999999999l) {
+ return startTime * 1000;
+ } else {
+ return startTime;
+ }
+ }
+
+ Long getEndTime() {
+ if (endTime < 9999999999l) {
+ return endTime * 1000;
+ } else {
+ return endTime;
+ }
+ }
+
+ void setNoLimit() {
+ this.noLimit = true;
+ }
+
+ Integer getLimit() {
+ if (noLimit) {
+ return null;
+ }
+ return limit == null ? PhoenixHBaseAccessor.RESULTSET_LIMIT : limit;
+ }
+
+ boolean isGrouped() {
+ return grouped;
+ }
+
+ boolean isEmpty() {
+ return (metricNames == null || metricNames.isEmpty())
+ && (hostname == null || hostname.isEmpty())
+ && (appId == null || appId.isEmpty())
+ && (instanceId == null || instanceId.isEmpty())
+ && startTime == null
+ && endTime == null;
+ }
+
+ Integer getFetchSize() {
+ return fetchSize;
+ }
+
+ void setFetchSize(Integer fetchSize) {
+ this.fetchSize = fetchSize;
+ }
+
+ void addOrderByColumn(String column) {
+ orderByColumns.add(column);
+ }
+
+ String getOrderByClause() {
+ String orderByStr = " ORDER BY ";
+ if (!orderByColumns.isEmpty()) {
+ StringBuilder sb = new StringBuilder(orderByStr);
+ for (String orderByColumn : orderByColumns) {
+ if (sb.length() != orderByStr.length()) {
+ sb.append(", ");
+ }
+ sb.append(orderByColumn);
+ }
+ sb.append(" ");
+ return sb.toString();
+ }
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return "Condition{" +
+ "metricNames=" + metricNames +
+ ", hostname='" + hostname + '\'' +
+ ", appId='" + appId + '\'' +
+ ", instanceId='" + instanceId + '\'' +
+ ", startTime=" + startTime +
+ ", endTime=" + endTime +
+ ", limit=" + limit +
+ ", grouped=" + grouped +
+ ", orderBy=" + orderByColumns +
+ ", noLimit=" + noLimit +
+ '}';
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java
new file mode 100644
index 0000000..d227993
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+public class TimelineClusterMetric {
+ private String metricName;
+ private String appId;
+ private String instanceId;
+ private long timestamp;
+ private String type;
+
+ TimelineClusterMetric(String metricName, String appId, String instanceId,
+ long timestamp, String type) {
+ this.metricName = metricName;
+ this.appId = appId;
+ this.instanceId = instanceId;
+ this.timestamp = timestamp;
+ this.type = type;
+ }
+
+ String getMetricName() {
+ return metricName;
+ }
+
+ String getAppId() {
+ return appId;
+ }
+
+ String getInstanceId() {
+ return instanceId;
+ }
+
+ long getTimestamp() {
+ return timestamp;
+ }
+
+ String getType() { return type; }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TimelineClusterMetric that = (TimelineClusterMetric) o;
+
+ if (timestamp != that.timestamp) return false;
+ if (appId != null ? !appId.equals(that.appId) : that.appId != null)
+ return false;
+ if (instanceId != null ? !instanceId.equals(that.instanceId) : that.instanceId != null)
+ return false;
+ if (!metricName.equals(that.metricName)) return false;
+
+ return true;
+ }
+
+ public boolean equalsExceptTime(TimelineClusterMetric metric) {
+ if (!metricName.equals(metric.metricName)) return false;
+ if (!appId.equals(metric.appId)) return false;
+ if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
+ return false;
+
+ return true;
+ }
+ @Override
+ public int hashCode() {
+ int result = metricName.hashCode();
+ result = 31 * result + (appId != null ? appId.hashCode() : 0);
+ result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
+ result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "TimelineClusterMetric{" +
+ "metricName='" + metricName + '\'' +
+ ", appId='" + appId + '\'' +
+ ", instanceId='" + instanceId + '\'' +
+ ", timestamp=" + timestamp +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
new file mode 100644
index 0000000..cab154b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
+
+public class TimelineMetricAggregator extends AbstractTimelineAggregator {
+ private static final Log LOG = LogFactory.getLog
+ (TimelineMetricAggregator.class);
+
+ private final String checkpointLocation;
+ private final Long sleepIntervalMillis;
+ private final Integer checkpointCutOffMultiplier;
+ private final String hostAggregatorDisabledParam;
+ private final String tableName;
+ private final String outputTableName;
+ private final Long nativeTimeRangeDelay;
+
+ public TimelineMetricAggregator(PhoenixHBaseAccessor hBaseAccessor,
+ Configuration metricsConf,
+ String checkpointLocation,
+ Long sleepIntervalMillis,
+ Integer checkpointCutOffMultiplier,
+ String hostAggregatorDisabledParam,
+ String tableName,
+ String outputTableName,
+ Long nativeTimeRangeDelay) {
+ super(hBaseAccessor, metricsConf);
+ this.checkpointLocation = checkpointLocation;
+ this.sleepIntervalMillis = sleepIntervalMillis;
+ this.checkpointCutOffMultiplier = checkpointCutOffMultiplier;
+ this.hostAggregatorDisabledParam = hostAggregatorDisabledParam;
+ this.tableName = tableName;
+ this.outputTableName = outputTableName;
+ this.nativeTimeRangeDelay = nativeTimeRangeDelay;
+ }
+
+ @Override
+ protected String getCheckpointLocation() {
+ return checkpointLocation;
+ }
+
+ @Override
+ protected void aggregate(ResultSet rs, long startTime, long endTime)
+ throws IOException, SQLException {
+ Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
+ aggregateMetricsFromResultSet(rs);
+
+ LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
+ hBaseAccessor.saveHostAggregateRecords(hostAggregateMap,
+ outputTableName);
+ }
+
+ @Override
+ protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
+ Condition condition = new Condition(null, null, null, null, startTime,
+ endTime, null, true);
+ condition.setNoLimit();
+ condition.setFetchSize(resultsetFetchSize);
+ condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
+ PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, nativeTimeRangeDelay),
+ tableName));
+ condition.addOrderByColumn("METRIC_NAME");
+ condition.addOrderByColumn("HOSTNAME");
+ condition.addOrderByColumn("APP_ID");
+ condition.addOrderByColumn("INSTANCE_ID");
+ condition.addOrderByColumn("SERVER_TIME");
+ return condition;
+ }
+
+ private Map<TimelineMetric, MetricHostAggregate> aggregateMetricsFromResultSet
+ (ResultSet rs) throws IOException, SQLException {
+ TimelineMetric existingMetric = null;
+ MetricHostAggregate hostAggregate = null;
+ Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
+ new HashMap<TimelineMetric, MetricHostAggregate>();
+
+ while (rs.next()) {
+ TimelineMetric currentMetric =
+ PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
+ MetricHostAggregate currentHostAggregate =
+ PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
+
+ if (existingMetric == null) {
+ // First row
+ existingMetric = currentMetric;
+ hostAggregate = new MetricHostAggregate();
+ hostAggregateMap.put(currentMetric, hostAggregate);
+ }
+
+ if (existingMetric.equalsExceptTime(currentMetric)) {
+ // Recalculate totals with current metric
+ hostAggregate.updateAggregates(currentHostAggregate);
+ } else {
+ // Switched over to a new metric - save existing - create new aggregate
+ hostAggregate = new MetricHostAggregate();
+ hostAggregate.updateAggregates(currentHostAggregate);
+ hostAggregateMap.put(currentMetric, hostAggregate);
+ existingMetric = currentMetric;
+ }
+ }
+ return hostAggregateMap;
+ }
+
+ @Override
+ protected Long getSleepIntervalMillis() {
+ return sleepIntervalMillis;
+ }
+
+ @Override
+ protected Integer getCheckpointCutOffMultiplier() {
+ return checkpointCutOffMultiplier;
+ }
+
+ @Override
+ protected boolean isDisabled() {
+ return metricsConf.getBoolean(hostAggregatorDisabledParam, false);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java
new file mode 100644
index 0000000..8b10079
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.hadoop.conf.Configuration;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_DISABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_DISABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
+
+/**
+ *
+ */
+public class TimelineMetricAggregatorFactory {
+ private static final String MINUTE_AGGREGATE_CHECKPOINT_FILE =
+ "timeline-metrics-host-aggregator-checkpoint";
+ private static final String MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE =
+ "timeline-metrics-host-aggregator-hourly-checkpoint";
+
+ public static TimelineMetricAggregator createTimelineMetricAggregatorMinute
+ (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+
+ String checkpointDir = metricsConf.get(
+ TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+ String checkpointLocation = FilenameUtils.concat(checkpointDir,
+ MINUTE_AGGREGATE_CHECKPOINT_FILE);
+ long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+ (HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l)); // 5 mins
+
+ int checkpointCutOffMultiplier = metricsConf.getInt
+ (HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 3);
+ String hostAggregatorDisabledParam = HOST_AGGREGATOR_MINUTE_DISABLED;
+
+ String inputTableName = METRICS_RECORD_TABLE_NAME;
+ String outputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+
+ return new TimelineMetricAggregator(hBaseAccessor, metricsConf,
+ checkpointLocation,
+ sleepIntervalMillis,
+ checkpointCutOffMultiplier,
+ hostAggregatorDisabledParam,
+ inputTableName,
+ outputTableName,
+ 120000l);
+ }
+
+ public static TimelineMetricAggregator createTimelineMetricAggregatorHourly
+ (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+
+ String checkpointDir = metricsConf.get(
+ TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+ String checkpointLocation = FilenameUtils.concat(checkpointDir,
+ MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE);
+ long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+ (HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l));
+
+ int checkpointCutOffMultiplier = metricsConf.getInt
+ (HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+ String hostAggregatorDisabledParam = HOST_AGGREGATOR_HOUR_DISABLED;
+
+ String inputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+ String outputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+
+ return new TimelineMetricAggregator(hBaseAccessor, metricsConf,
+ checkpointLocation,
+ sleepIntervalMillis,
+ checkpointCutOffMultiplier,
+ hostAggregatorDisabledParam,
+ inputTableName,
+ outputTableName,
+ 3600000l);
+ }
+
+
+}