You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2014/11/14 20:06:20 UTC
ambari git commit: AMBARI-5707. Fix Average calculations in the
timeline service.
Repository: ambari
Updated Branches:
refs/heads/branch-metrics-dev c1b0d25af -> e5542aa5f
AMBARI-5707. Fix Average calculations in the timeline service.
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/e5542aa5
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/e5542aa5
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/e5542aa5
Branch: refs/heads/branch-metrics-dev
Commit: e5542aa5fa2fa9363a068fe4f18c86a23291162f
Parents: c1b0d25
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Fri Nov 14 11:01:25 2014 -0800
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Fri Nov 14 11:01:25 2014 -0800
----------------------------------------------------------------------
.../pom.xml | 6 +-
.../timeline/AbstractTimelineAggregator.java | 30 +-
.../metrics/timeline/Aggregator.java | 60 ++++
.../metrics/timeline/ConnectionProvider.java | 29 ++
.../timeline/DefaultPhoenixDataSource.java | 80 +++++
.../metrics/timeline/PhoenixHBaseAccessor.java | 114 +++----
.../metrics/timeline/PhoenixTransactSQL.java | 161 ++++++----
.../metrics/loadsimulator/data/TestMetric.java | 6 +-
.../util/TestTimeStampProvider.java | 2 +-
.../metrics/timeline/TestHBaseAccessor.java | 294 +++++++++++++++++++
.../timeline/TestMetricHostAggregate.java | 50 ++++
11 files changed, 691 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/e5542aa5/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml b/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml
index 346b89d..51a7de7 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml
@@ -252,9 +252,9 @@
</dependency>
<dependency>
- <groupId>org.easytesting</groupId>
- <artifactId>fest-assert</artifactId>
- <version>1.4</version>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <version>1.7.0</version>
<scope>test</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/ambari/blob/e5542aa5/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
index 5b11b51..a13b591 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
@@ -273,6 +273,7 @@ public abstract class AbstractTimelineAggregator implements Runnable {
@Override
public String toString() {
+// MetricClusterAggregate
return "MetricAggregate{" +
"sum=" + sum +
", numberOfHosts=" + numberOfHosts +
@@ -289,11 +290,30 @@ public abstract class AbstractTimelineAggregator implements Runnable {
*/
public static class MetricHostAggregate extends MetricAggregate {
+ private long numberOfSamples = 0;
+
@JsonCreator
public MetricHostAggregate() {
super(0.0, 0.0, Double.MIN_VALUE, Double.MAX_VALUE);
}
+ @JsonProperty("numberOfSamples")
+ long getNumberOfSamples() {
+ return numberOfSamples == 0 ? 1 : numberOfSamples;
+ }
+
+ void updateNumberOfSamples(long count) {
+ this.numberOfSamples += count;
+ }
+
+ public void setNumberOfSamples(long numberOfSamples) {
+ this.numberOfSamples = numberOfSamples;
+ }
+
+ public double getAvg() {
+ return sum / numberOfSamples;
+ }
+
/**
* Find and update min, max and avg for a minute
*/
@@ -301,20 +321,14 @@ public abstract class AbstractTimelineAggregator implements Runnable {
updateMax(hostAggregate.getMax());
updateMin(hostAggregate.getMin());
updateSum(hostAggregate.getSum());
- }
-
- /**
- * Reuse sum to indicate average for a host for the hour
- */
- @Override
- void updateSum(Double sum) {
- this.sum = (this.sum + sum) / 2;
+ updateNumberOfSamples(hostAggregate.getNumberOfSamples());
}
@Override
public String toString() {
return "MetricHostAggregate{" +
"sum=" + sum +
+ ", numberOfSamples=" + numberOfSamples +
", deviation=" + deviation +
", max=" + max +
", min=" + min +
http://git-wip-us.apache.org/repos/asf/ambari/blob/e5542aa5/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Aggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Aggregator.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Aggregator.java
new file mode 100644
index 0000000..f514298
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Aggregator.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline;
+
+
+import java.util.Map;
+
+/**
+ *
+ */
+public class Aggregator {
+
+ public double[] calculateAggregates(Map<Long, Double> metricValues) {
+ double[] values = new double[4];
+ double max = Double.MIN_VALUE;
+ double min = Double.MAX_VALUE;
+ double sum = 0.0;
+ int metricCount = 0;
+
+ if (metricValues != null && !metricValues.isEmpty()) {
+ for (Double value : metricValues.values()) {
+ // TODO: Some nulls in data - need to investigate null values from host
+ if (value != null) {
+ if (value > max) {
+ max = value;
+ }
+ if (value < min) {
+ min = value;
+ }
+ sum += value;
+ }
+ }
+ metricCount = metricValues.values().size();
+ }
+ // BR: WHY ZERO is a good idea?
+ values[0] = sum;
+ values[1] = max != Double.MIN_VALUE ? max : 0.0;
+ values[2] = min != Double.MAX_VALUE ? min : 0.0;
+ values[3] = metricCount;
+
+ return values;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e5542aa5/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ConnectionProvider.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ConnectionProvider.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ConnectionProvider.java
new file mode 100644
index 0000000..87650af
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ConnectionProvider.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline;
+
+
+import java.sql.Connection;
+
+/**
+ *
+ */
+public interface ConnectionProvider {
+ public Connection getConnection();
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e5542aa5/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java
new file mode 100644
index 0000000..c20dd14
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+public class DefaultPhoenixDataSource implements ConnectionProvider {
+
+ static final Log LOG = LogFactory.getLog(DefaultPhoenixDataSource.class);
+ private static final String ZOOKEEPER_CLIENT_PORT =
+ "hbase.zookeeper.property.clientPort";
+ private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
+ private static final String ZNODE_PARENT = "zookeeper.znode.parent";
+
+ private static final String connectionUrl = "jdbc:phoenix:%s:%s:%s";
+ private final String url;
+
+ public DefaultPhoenixDataSource(Configuration hbaseConf) {
+ String zookeeperClientPort = hbaseConf.getTrimmed(ZOOKEEPER_CLIENT_PORT,
+ "2181");
+ String zookeeperQuorum = hbaseConf.getTrimmed(ZOOKEEPER_QUORUM);
+ String znodeParent = hbaseConf.getTrimmed(ZNODE_PARENT, "/hbase");
+ if (zookeeperQuorum == null || zookeeperQuorum.isEmpty()) {
+ throw new IllegalStateException("Unable to find Zookeeper quorum to " +
+ "access HBase store using Phoenix.");
+ }
+
+ url = String.format(connectionUrl,
+ zookeeperQuorum,
+ zookeeperClientPort,
+ znodeParent);
+ }
+
+ /**
+ * Get JDBC connection to HBase store. Assumption is that the hbase
+ * configuration is present on the classpath and loaded by the caller into
+ * the Configuration object.
+ * Phoenix already caches the HConnection between the client and HBase
+ * cluster.
+ *
+ * @return @java.sql.Connection
+ */
+ public Connection getConnection() {
+ Connection connection = null;
+
+ LOG.debug("Metric store connection url: " + url);
+ try {
+ // TODO: Exception is swallowed, it should be thrown - discuss it
+ connection = DriverManager.getConnection(url);
+ } catch (SQLException e) {
+ LOG.warn("Unable to connect to HBase store using Phoenix.", e);
+ }
+
+ return connection;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e5542aa5/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index 1298973..2d53a0b 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -36,6 +36,7 @@ import java.sql.Statement;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractTimelineAggregator.MetricClusterAggregate;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractTimelineAggregator.MetricHostAggregate;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL;
@@ -68,26 +69,31 @@ public class PhoenixHBaseAccessor {
private final Configuration hbaseConf;
private final Configuration metricsConf;
static final Log LOG = LogFactory.getLog(PhoenixHBaseAccessor.class);
- private static final String connectionUrl = "jdbc:phoenix:%s:%s:%s";
- private static final String ZOOKEEPER_CLIENT_PORT =
- "hbase.zookeeper.property.clientPort";
- private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
- private static final String ZNODE_PARENT = "zookeeper.znode.parent";
+
static final int PHOENIX_MAX_MUTATION_STATE_SIZE = 50000;
/**
* 4 metrics/min * 60 * 24: Retrieve data for 1 day.
*/
- public static int RESULTSET_LIMIT = 5760;
+ private static final int METRICS_PER_MINUTE = 4;
+ public static int RESULTSET_LIMIT = (int)TimeUnit.DAYS.toMinutes(1) *
+ METRICS_PER_MINUTE;
private static ObjectMapper mapper;
static {
mapper = new ObjectMapper();
}
-
private static TypeReference<Map<Long, Double>> metricValuesTypeRef =
new TypeReference<Map<Long, Double>>() {};
+ private final ConnectionProvider dataSource;
- public PhoenixHBaseAccessor(Configuration hbaseConf, Configuration metricsConf) {
+ public PhoenixHBaseAccessor(Configuration hbaseConf,
+ Configuration metricsConf){
+ this(hbaseConf, metricsConf, new DefaultPhoenixDataSource(hbaseConf));
+ }
+
+ public PhoenixHBaseAccessor(Configuration hbaseConf,
+ Configuration metricsConf,
+ ConnectionProvider dataSource) {
this.hbaseConf = hbaseConf;
this.metricsConf = metricsConf;
RESULTSET_LIMIT = metricsConf.getInt(GLOBAL_RESULT_LIMIT, 5760);
@@ -97,6 +103,7 @@ public class PhoenixHBaseAccessor {
LOG.error("Phoenix client jar not found in the classpath.", e);
throw new IllegalStateException(e);
}
+ this.dataSource = dataSource;
}
/**
@@ -105,31 +112,11 @@ public class PhoenixHBaseAccessor {
* the Configuration object.
* Phoenix already caches the HConnection between the client and HBase
* cluster.
+ *
* @return @java.sql.Connection
*/
public Connection getConnection() {
- Connection connection = null;
- String zookeeperClientPort = hbaseConf.getTrimmed(ZOOKEEPER_CLIENT_PORT, "2181");
- String zookeeperQuorum = hbaseConf.getTrimmed(ZOOKEEPER_QUORUM);
- String znodeParent = hbaseConf.getTrimmed(ZNODE_PARENT, "/hbase");
-
- if (zookeeperQuorum == null || zookeeperQuorum.isEmpty()) {
- throw new IllegalStateException("Unable to find Zookeeper quorum to " +
- "access HBase store using Phoenix.");
- }
-
- String url = String.format(connectionUrl, zookeeperQuorum,
- zookeeperClientPort, znodeParent);
-
- LOG.debug("Metric store connection url: " + url);
-
- try {
- connection = DriverManager.getConnection(url);
- } catch (SQLException e) {
- LOG.warn("Unable to connect to HBase store using Phoenix.", e);
- }
-
- return connection;
+ return dataSource.getConnection();
}
public static Map readMetricFromJSON(String json) throws IOException {
@@ -138,7 +125,7 @@ public class PhoenixHBaseAccessor {
@SuppressWarnings("unchecked")
static TimelineMetric getTimelineMetricFromResultSet(ResultSet rs)
- throws SQLException, IOException {
+ throws SQLException, IOException {
TimelineMetric metric = new TimelineMetric();
metric.setMetricName(rs.getString("METRIC_NAME"));
metric.setAppId(rs.getString("APP_ID"));
@@ -153,7 +140,7 @@ public class PhoenixHBaseAccessor {
}
static TimelineMetric getTimelineMetricKeyFromResultSet(ResultSet rs)
- throws SQLException, IOException {
+ throws SQLException, IOException {
TimelineMetric metric = new TimelineMetric();
metric.setMetricName(rs.getString("METRIC_NAME"));
metric.setAppId(rs.getString("APP_ID"));
@@ -165,11 +152,13 @@ public class PhoenixHBaseAccessor {
}
static MetricHostAggregate getMetricHostAggregateFromResultSet(ResultSet rs)
- throws SQLException {
+ throws SQLException {
MetricHostAggregate metricHostAggregate = new MetricHostAggregate();
- metricHostAggregate.setSum(rs.getDouble("METRIC_AVG"));
+ metricHostAggregate.setSum(rs.getDouble("METRIC_SUM"));
metricHostAggregate.setMax(rs.getDouble("METRIC_MAX"));
metricHostAggregate.setMin(rs.getDouble("METRIC_MIN"));
+ metricHostAggregate.setNumberOfSamples(rs.getLong("METRIC_COUNT"));
+
metricHostAggregate.setDeviation(0.0);
return metricHostAggregate;
}
@@ -199,7 +188,7 @@ public class PhoenixHBaseAccessor {
stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL,
encoding, clusterMinTtl, compression));
stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL,
- encoding, clusterHourTtl, compression));
+ encoding, clusterHourTtl, compression));
conn.commit();
} catch (SQLException sql) {
LOG.warn("Error creating Metrics Schema in HBase using Phoenix.", sql);
@@ -222,7 +211,7 @@ public class PhoenixHBaseAccessor {
}
public void insertMetricRecords(TimelineMetrics metrics)
- throws SQLException, IOException {
+ throws SQLException, IOException {
List<TimelineMetric> timelineMetrics = metrics.getMetrics();
if (timelineMetrics == null || timelineMetrics.isEmpty()) {
@@ -247,7 +236,9 @@ public class PhoenixHBaseAccessor {
LOG.trace("host: " + metric.getHostName() + ", " +
"metricName = " + metric.getMetricName() + ", " +
"values: " + metric.getMetricValues());
- Double[] aggregates = calculateAggregates(metric.getMetricValues());
+ Aggregator agg = new Aggregator();
+ double[] aggregates = agg.calculateAggregates(
+ metric.getMetricValues());
metricRecordStmt.setString(1, metric.getMetricName());
//metricRecordTmpStmt.setString(1, metric.getMetricName());
@@ -268,10 +259,11 @@ public class PhoenixHBaseAccessor {
metricRecordStmt.setDouble(9, aggregates[1]);
//metricRecordTmpStmt.setDouble(9, aggregates[1]);
metricRecordStmt.setDouble(10, aggregates[2]);
+ metricRecordStmt.setLong(11, (long)aggregates[3]);
//metricRecordTmpStmt.setDouble(10, aggregates[2]);
String json =
TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues());
- metricRecordStmt.setString(11, json);
+ metricRecordStmt.setString(12, json);
//metricRecordTmpStmt.setString(11, json);
try {
@@ -309,35 +301,10 @@ public class PhoenixHBaseAccessor {
}
}
- private Double[] calculateAggregates(Map<Long, Double> metricValues) {
- Double[] values = new Double[3];
- Double max = Double.MIN_VALUE;
- Double min = Double.MAX_VALUE;
- Double avg = 0.0;
- if (metricValues != null && !metricValues.isEmpty()) {
- for (Double value : metricValues.values()) {
- // TODO: Some nulls in data - need to investigate null values from host
- if (value != null) {
- if (value > max) {
- max = value;
- }
- if (value < min) {
- min = value;
- }
- avg += value;
- }
- }
- avg /= metricValues.values().size();
- }
- values[0] = max != Double.MIN_VALUE ? max : 0.0;
- values[1] = min != Double.MAX_VALUE ? min : 0.0;
- values[2] = avg;
- return values;
- }
@SuppressWarnings("unchecked")
public TimelineMetrics getMetricRecords(final Condition condition)
- throws SQLException, IOException {
+ throws SQLException, IOException {
if (condition.isEmpty()) {
throw new SQLException("No filter criteria specified.");
@@ -382,8 +349,8 @@ public class PhoenixHBaseAccessor {
}
public void saveHostAggregateRecords(Map<TimelineMetric,
- MetricHostAggregate> hostAggregateMap, String phoenixTableName)
- throws SQLException {
+ MetricHostAggregate> hostAggregateMap, String phoenixTableName)
+ throws SQLException {
if (hostAggregateMap != null && !hostAggregateMap.isEmpty()) {
Connection conn = getConnection();
@@ -397,7 +364,7 @@ public class PhoenixHBaseAccessor {
String.format(UPSERT_AGGREGATE_RECORD_SQL, phoenixTableName));
for (Map.Entry<TimelineMetric, MetricHostAggregate> metricAggregate :
- hostAggregateMap.entrySet()) {
+ hostAggregateMap.entrySet()) {
TimelineMetric metric = metricAggregate.getKey();
MetricHostAggregate hostAggregate = metricAggregate.getValue();
@@ -413,8 +380,10 @@ public class PhoenixHBaseAccessor {
stmt.setDouble(7, hostAggregate.getSum());
stmt.setDouble(8, hostAggregate.getMax());
stmt.setDouble(9, hostAggregate.getMin());
+ stmt.setDouble(10, hostAggregate.getNumberOfSamples());
try {
+ // TODO: Why this exception is swallowed
stmt.executeUpdate();
} catch (SQLException sql) {
LOG.error(sql);
@@ -457,10 +426,11 @@ public class PhoenixHBaseAccessor {
/**
* Save Metric aggregate records.
+ *
* @throws SQLException
*/
public void saveClusterAggregateRecords(Map<TimelineClusterMetric,
- MetricClusterAggregate> records) throws SQLException {
+ MetricClusterAggregate> records) throws SQLException {
if (records == null || records.isEmpty()) {
LOG.debug("Empty aggregate records.");
return;
@@ -475,7 +445,7 @@ public class PhoenixHBaseAccessor {
int rowCount = 0;
for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate>
- aggregateEntry : records.entrySet()) {
+ aggregateEntry : records.entrySet()) {
TimelineClusterMetric clusterMetric = aggregateEntry.getKey();
MetricClusterAggregate aggregate = aggregateEntry.getValue();
@@ -533,7 +503,7 @@ public class PhoenixHBaseAccessor {
public TimelineMetrics getAggregateMetricRecords(final Condition condition)
- throws SQLException {
+ throws SQLException {
if (condition.isEmpty()) {
throw new SQLException("No filter criteria specified.");
@@ -556,8 +526,8 @@ public class PhoenixHBaseAccessor {
metric.setTimestamp(rs.getLong("SERVER_TIME"));
metric.setStartTime(rs.getLong("SERVER_TIME"));
Map<Long, Double> valueMap = new HashMap<Long, Double>();
- valueMap.put(rs.getLong("SERVER_TIME"), rs.getDouble("METRIC_SUM") /
- rs.getInt("HOSTS_COUNT"));
+ valueMap.put(rs.getLong("SERVER_TIME"),
+ rs.getDouble("METRIC_SUM") / rs.getInt("HOSTS_COUNT"));
metric.setMetricValues(valueMap);
if (condition.isGrouped()) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/e5542aa5/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
index db70913..8264c41 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
@@ -15,7 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -35,14 +36,20 @@ public class PhoenixTransactSQL {
static final Log LOG = LogFactory.getLog(PhoenixTransactSQL.class);
// TODO: Configurable TTL values
/**
- * Create table to store individual metric records.
- */
+ * Create table to store individual metric records.
+ */
public static final String CREATE_METRICS_TABLE_SQL = "CREATE TABLE IF NOT " +
"EXISTS METRIC_RECORD (METRIC_NAME VARCHAR, " +
- "HOSTNAME VARCHAR, SERVER_TIME UNSIGNED_LONG NOT NULL, " +
- "APP_ID VARCHAR, INSTANCE_ID VARCHAR, " +
- "START_TIME UNSIGNED_LONG, UNITS CHAR(20), " +
- "METRIC_AVG DOUBLE, METRIC_MAX DOUBLE, METRIC_MIN DOUBLE, " +
+ "HOSTNAME VARCHAR, " +
+ "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+ "APP_ID VARCHAR, " +
+ "INSTANCE_ID VARCHAR, " +
+ "START_TIME UNSIGNED_LONG, " +
+ "UNITS CHAR(20), " +
+ "METRIC_SUM DOUBLE, " +
+ "METRIC_COUNT UNSIGNED_INT, " +
+ "METRIC_MAX DOUBLE, " +
+ "METRIC_MIN DOUBLE, " +
"METRICS VARCHAR CONSTRAINT pk " +
"PRIMARY KEY (METRIC_NAME, HOSTNAME, SERVER_TIME, APP_ID, " +
"INSTANCE_ID)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
@@ -50,82 +57,128 @@ public class PhoenixTransactSQL {
public static final String CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL =
"CREATE TABLE IF NOT EXISTS METRIC_RECORD_HOURLY " +
- "(METRIC_NAME VARCHAR, HOSTNAME VARCHAR, " +
- "APP_ID VARCHAR, INSTANCE_ID VARCHAR, " +
- "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
- "UNITS CHAR(20), METRIC_AVG DOUBLE, METRIC_MAX DOUBLE," +
- "METRIC_MIN DOUBLE CONSTRAINT pk " +
- "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
- "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
- "TTL=%s, COMPRESSION='%s'";
+ "(METRIC_NAME VARCHAR, " +
+ "HOSTNAME VARCHAR, " +
+ "APP_ID VARCHAR, " +
+ "INSTANCE_ID VARCHAR, " +
+ "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+ "UNITS CHAR(20), " +
+ "METRIC_SUM DOUBLE," +
+ "METRIC_COUNT UNSIGNED_INT, " +
+ "METRIC_MAX DOUBLE," +
+ "METRIC_MIN DOUBLE CONSTRAINT pk " +
+ "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
+ "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
+ "TTL=%s, COMPRESSION='%s'";
public static final String CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL =
"CREATE TABLE IF NOT EXISTS METRIC_RECORD_MINUTE " +
- "(METRIC_NAME VARCHAR, HOSTNAME VARCHAR, " +
- "APP_ID VARCHAR, INSTANCE_ID VARCHAR, " +
- "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
- "UNITS CHAR(20), METRIC_AVG DOUBLE, METRIC_MAX DOUBLE," +
- "METRIC_MIN DOUBLE CONSTRAINT pk " +
- "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
- "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, TTL=%s," +
- " COMPRESSION='%s'";
+ "(METRIC_NAME VARCHAR, " +
+ "HOSTNAME VARCHAR, " +
+ "APP_ID VARCHAR, " +
+ "INSTANCE_ID VARCHAR, " +
+ "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+ "UNITS CHAR(20), " +
+ "METRIC_SUM DOUBLE," +
+ "METRIC_COUNT UNSIGNED_INT, " +
+ "METRIC_MAX DOUBLE," +
+ "METRIC_MIN DOUBLE CONSTRAINT pk " +
+ "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
+ "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, TTL=%s," +
+ " COMPRESSION='%s'";
public static final String CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL =
"CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE " +
- "(METRIC_NAME VARCHAR, APP_ID VARCHAR, INSTANCE_ID VARCHAR, " +
- "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
- "UNITS CHAR(20), METRIC_SUM DOUBLE, " +
- "HOSTS_COUNT UNSIGNED_INT, METRIC_MAX DOUBLE, METRIC_MIN DOUBLE " +
- "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " +
- "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
- "TTL=%s, COMPRESSION='%s'";
+ "(METRIC_NAME VARCHAR, " +
+ "APP_ID VARCHAR, " +
+ "INSTANCE_ID VARCHAR, " +
+ "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+ "UNITS CHAR(20), " +
+ "METRIC_SUM DOUBLE, " +
+ "HOSTS_COUNT UNSIGNED_INT, " +
+ "METRIC_MAX DOUBLE, " +
+ "METRIC_MIN DOUBLE " +
+ "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " +
+ "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
+ "TTL=%s, COMPRESSION='%s'";
public static final String CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL =
"CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE_HOURLY " +
- "(METRIC_NAME VARCHAR, APP_ID VARCHAR, INSTANCE_ID VARCHAR, " +
- "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
- "UNITS CHAR(20), METRIC_AVG DOUBLE, " +
- "METRIC_MAX DOUBLE, METRIC_MIN DOUBLE " +
- "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " +
- "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
- "TTL=%s, COMPRESSION='%s'";
+ "(METRIC_NAME VARCHAR, " +
+ "APP_ID VARCHAR, " +
+ "INSTANCE_ID VARCHAR, " +
+ "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+ "UNITS CHAR(20), " +
+ "METRIC_AVG DOUBLE, " +
+ "METRIC_MAX DOUBLE, " +
+ "METRIC_MIN DOUBLE " +
+ "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " +
+ "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
+ "TTL=%s, COMPRESSION='%s'";
/**
* Insert into metric records table.
*/
public static final String UPSERT_METRICS_SQL = "UPSERT INTO %s " +
"(METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, START_TIME, " +
- "UNITS, METRIC_AVG, METRIC_MAX, METRIC_MIN, METRICS) VALUES " +
- "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+ "UNITS, " +
+ "METRIC_SUM, " +
+ "METRIC_MAX, " +
+ "METRIC_MIN, " +
+ "METRIC_COUNT, " +
+ "METRICS) VALUES " +
+ "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
public static final String UPSERT_CLUSTER_AGGREGATE_SQL = "UPSERT INTO " +
"METRIC_AGGREGATE (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, " +
- "UNITS, METRIC_SUM, HOSTS_COUNT, METRIC_MAX, METRIC_MIN) " +
+ "UNITS, " +
+ "METRIC_SUM, " +
+ "HOSTS_COUNT, " +
+ "METRIC_MAX, " +
+ "METRIC_MIN) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
public static final String UPSERT_AGGREGATE_RECORD_SQL = "UPSERT INTO " +
"%s (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
- "SERVER_TIME, UNITS, METRIC_AVG, METRIC_MAX, METRIC_MIN) " +
- "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
+ "SERVER_TIME, " +
+ "UNITS, " +
+ "METRIC_SUM, " +
+ "METRIC_MAX, " +
+ "METRIC_MIN," +
+ "METRIC_COUNT) " +
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
/**
* Retrieve a set of rows from metrics records table.
*/
public static final String GET_METRIC_SQL = "SELECT METRIC_NAME, " +
- "HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, START_TIME, UNITS, METRIC_AVG, " +
- "METRIC_MAX, METRIC_MIN, METRICS FROM %s";
+ "HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, START_TIME, UNITS, " +
+ "METRIC_SUM, " +
+ "METRIC_MAX, " +
+ "METRIC_MIN, " +
+ "METRIC_COUNT, " +
+ "METRICS " +
+ "FROM %s";
public static final String GET_METRIC_AGGREGATE_ONLY_SQL = "SELECT " +
"METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, " +
- "UNITS, METRIC_AVG, METRIC_MAX, METRIC_MIN FROM %s";
+ "UNITS, " +
+ "METRIC_SUM, " +
+ "METRIC_MAX, " +
+ "METRIC_MIN, " +
+ "METRIC_COUNT " +
+ "FROM %s";
public static final String GET_CLUSTER_AGGREGATE_SQL =
"SELECT METRIC_NAME, APP_ID, " +
- "INSTANCE_ID, SERVER_TIME, METRIC_SUM, HOSTS_COUNT, METRIC_MAX, " +
- "METRIC_MIN FROM METRIC_AGGREGATE";
-
- public static final String METRICS_RECORD_TABLE_NAME =
- "METRIC_RECORD";
+ "INSTANCE_ID, SERVER_TIME, " +
+ "METRIC_SUM, " +
+ "HOSTS_COUNT, " +
+ "METRIC_MAX, " +
+ "METRIC_MIN " +
+ "FROM METRIC_AGGREGATE";
+
+ public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD";
public static final String METRICS_AGGREGATE_MINUTE_TABLE_NAME =
"METRIC_RECORD_MINUTE";
public static final String METRICS_AGGREGATE_HOURLY_TABLE_NAME =
@@ -134,7 +187,7 @@ public class PhoenixTransactSQL {
public static final String DEFAULT_ENCODING = "FAST_DIFF";
public static PreparedStatement prepareGetMetricsSqlStmt(
- Connection connection, Condition condition) throws SQLException {
+ Connection connection, Condition condition) throws SQLException {
if (condition.isEmpty()) {
throw new IllegalArgumentException("Condition is empty.");
@@ -162,7 +215,7 @@ public class PhoenixTransactSQL {
PreparedStatement stmt = connection.prepareStatement(sb.toString());
int pos = 1;
if (condition.getMetricNames() != null) {
- for ( ; pos <= condition.getMetricNames().size(); pos++) {
+ for (; pos <= condition.getMetricNames().size(); pos++) {
stmt.setString(pos, condition.getMetricNames().get(pos - 1));
}
}
@@ -190,7 +243,7 @@ public class PhoenixTransactSQL {
}
public static PreparedStatement prepareGetAggregateSqlStmt(
- Connection connection, Condition condition) throws SQLException {
+ Connection connection, Condition condition) throws SQLException {
if (condition.isEmpty()) {
throw new IllegalArgumentException("Condition is empty.");
@@ -208,7 +261,7 @@ public class PhoenixTransactSQL {
PreparedStatement stmt = connection.prepareStatement(sb.toString());
int pos = 1;
if (condition.getMetricNames() != null) {
- for ( ; pos <= condition.getMetricNames().size(); pos++) {
+ for (; pos <= condition.getMetricNames().size(); pos++) {
stmt.setString(pos, condition.getMetricNames().get(pos - 1));
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e5542aa5/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/TestMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/TestMetric.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/TestMetric.java
index 2e07419..a0572a2 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/TestMetric.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/TestMetric.java
@@ -22,8 +22,8 @@ import org.junit.Test;
import java.io.IOException;
-import static org.fest.assertions.Assertions.assertThat;
-import static org.fest.assertions.MapAssert.entry;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.entry;
import static org.junit.Assert.assertEquals;
public class TestMetric {
@@ -71,7 +71,7 @@ public class TestMetric {
assertEquals("disk_free", m.getMetricname());
assertEquals("0", m.getStarttime());
- assertThat(m.getMetrics()).isNotEmpty().hasSize(4).includes(
+ assertThat(m.getMetrics()).isNotEmpty().hasSize(4).contains(
entry("0", "5.35"),
entry("5000", "5.35"),
entry("10000", "5.35"),
http://git-wip-us.apache.org/repos/asf/ambari/blob/e5542aa5/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/TestTimeStampProvider.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/TestTimeStampProvider.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/TestTimeStampProvider.java
index 49272d8..dd513aa 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/TestTimeStampProvider.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/TestTimeStampProvider.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.loadsimu
import org.junit.Test;
-import static org.fest.assertions.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
public class TestTimeStampProvider {
http://git-wip-us.apache.org/repos/asf/ambari/blob/e5542aa5/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestHBaseAccessor.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestHBaseAccessor.java
new file mode 100644
index 0000000..7958055
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestHBaseAccessor.java
@@ -0,0 +1,294 @@
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.junit.After;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.sql.*;
+import java.util.*;
+
+import static junit.framework.Assert.*;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.PhoenixTransactSQL.*;
+import static org.assertj.core.api.Assertions.assertThat;
+
+@Ignore
+public class TestHBaseAccessor {
+ private static String MY_LOCAL_URL =
+ "jdbc:phoenix:c6503.ambari.apache.org:" + 2181 + ":/hbase";
+
+ @After
+ public void tearDown() throws Exception {
+ Connection conn = getConnection(getUrl());
+ Statement stmt = conn.createStatement();
+
+ stmt.execute("DROP TABLE METRIC_AGGREGATE");
+ stmt.execute("DROP TABLE METRIC_AGGREGATE_HOURLY");
+ stmt.execute("DROP TABLE METRIC_RECORD");
+ stmt.execute("DROP TABLE METRIC_RECORD_HOURLY");
+ stmt.execute("DROP TABLE METRIC_RECORD_MINUTE");
+ }
+
+ /**
+ * A canary test.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testClusterOK() throws Exception {
+ Connection conn = getConnection(getUrl());
+ Statement stmt = conn.createStatement();
+ String sampleDDL = "CREATE TABLE TEST_METRICS (TEST_COLUMN VARCHAR " +
+ "CONSTRAINT pk PRIMARY KEY (TEST_COLUMN)) " +
+ "DATA_BLOCK_ENCODING='FAST_DIFF', " +
+ "IMMUTABLE_ROWS=true, TTL=86400, COMPRESSION='SNAPPY'";
+
+ stmt.executeUpdate(sampleDDL);
+ ResultSet rs = stmt.executeQuery(
+ "SELECT COUNT(TEST_COLUMN) FROM TEST_METRICS");
+
+ rs.next();
+ long l = rs.getLong(1);
+ assertThat(l).isGreaterThanOrEqualTo(0);
+
+ stmt.execute("DROP TABLE TEST_METRICS");
+ }
+
+ @Test
+ public void testShouldInsertMetrics() throws Exception {
+ // GIVEN
+ PhoenixHBaseAccessor sut = createTestableHBaseAccessor();
+
+ // WHEN
+ long startTime = System.currentTimeMillis();
+ TimelineMetrics metricsSent = prepareTimelineMetrics(startTime);
+ sut.insertMetricRecords(metricsSent);
+
+ Condition queryCondition = new Condition(null, "local", null, null,
+ startTime, startTime + (15 * 60 * 1000), null, false);
+ TimelineMetrics recordRead = sut.getMetricRecords(queryCondition);
+
+ // THEN
+ assertThat(recordRead.getMetrics()).hasSize(2)
+ .extracting("metricName")
+ .containsOnly("mem_free", "disk_free");
+
+ assertThat(metricsSent.getMetrics())
+ .usingElementComparator(TIME_IGNORING_COMPARATOR)
+ .containsExactlyElementsOf(recordRead.getMetrics());
+ }
+
+ @Test
+ public void testShouldAggregateMinuteProperly() throws Exception {
+ // GIVEN
+ PhoenixHBaseAccessor hdb = createTestableHBaseAccessor();
+ Connection conn = getConnection(getUrl());
+ hdb.initMetricSchema();
+ TimelineMetricAggregatorMinute aggregatorMinute =
+ new TimelineMetricAggregatorMinute(hdb, new Configuration());
+
+ long startTime = System.currentTimeMillis();
+ TimelineMetrics metricsSent = prepareTimelineMetrics(startTime);
+ hdb.insertMetricRecords(metricsSent);
+ hdb.insertMetricRecords(prepareTimelineMetrics(startTime + 1 * 60 * 1000));
+ hdb.insertMetricRecords(prepareTimelineMetrics(startTime + 2 * 60 * 1000));
+ hdb.insertMetricRecords(prepareTimelineMetrics(startTime + 3 * 60 * 1000));
+ hdb.insertMetricRecords(prepareTimelineMetrics(startTime + 4 * 60 * 1000));
+
+ // WHEN
+ long endTime = startTime + 1000 * 60 * 4;
+ boolean success = aggregatorMinute.doWork(startTime,
+ endTime);
+
+ //THEN
+ Condition condition = new Condition(null, null, null, null, startTime,
+ endTime, null, true);
+ condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
+ METRICS_AGGREGATE_MINUTE_TABLE_NAME));
+
+ PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
+ (conn, condition);
+ ResultSet rs = pstmt.executeQuery();
+ AbstractTimelineAggregator.MetricHostAggregate expectedAggregate =
+ createMetricHostAggregate(2.0, 0.0, 20, 15.0);
+
+ int count = 0;
+ while (rs.next()) {
+ TimelineMetric currentMetric =
+ PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
+ AbstractTimelineAggregator.MetricHostAggregate currentHostAggregate =
+ PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
+
+ if ("disk_free".equals(currentMetric.getMetricName())) {
+ assertEquals(2.0, currentHostAggregate.getMax());
+ assertEquals(0.0, currentHostAggregate.getMin());
+ assertEquals(20, currentHostAggregate.getNumberOfSamples());
+ assertEquals(15.0, currentHostAggregate.getSum());
+ assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+ } else if ("mem_free".equals(currentMetric.getMetricName())) {
+ assertEquals(2.0, currentHostAggregate.getMax());
+ assertEquals(0.0, currentHostAggregate.getMin());
+ assertEquals(20, currentHostAggregate.getNumberOfSamples());
+ assertEquals(15.0, currentHostAggregate.getSum());
+ assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+ } else {
+ fail("Unexpected entry");
+ }
+ }
+ assertEquals("Two aggregated entries expected", 2, count);
+ }
+
+ @Test
+ public void testShouldAggregateHourProperly() throws Exception {
+ // GIVEN
+ PhoenixHBaseAccessor hdb = createTestableHBaseAccessor();
+ Connection conn = getConnection(getUrl());
+ hdb.initMetricSchema();
+ TimelineMetricAggregatorHourly aggregator =
+ new TimelineMetricAggregatorHourly(hdb, new Configuration());
+ long startTime = System.currentTimeMillis();
+
+ AbstractTimelineAggregator.MetricHostAggregate expectedAggregate =
+ createMetricHostAggregate(2.0, 0.0, 20, 15.0);
+ Map<TimelineMetric, AbstractTimelineAggregator.MetricHostAggregate>
+ aggMap = new HashMap<TimelineMetric,
+ AbstractTimelineAggregator.MetricHostAggregate>();
+
+ int min_5 = 5 * 60 * 1000;
+ long ctime = startTime - min_5;
+ aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+ aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+ aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+ aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+ aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+ aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+ aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+ aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+ aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+ aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+ aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+ aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+
+ hdb.saveHostAggregateRecords(aggMap, METRICS_AGGREGATE_MINUTE_TABLE_NAME);
+
+ //WHEN
+ long endTime = ctime + min_5;
+ boolean success = aggregator.doWork(startTime, endTime);
+ assertTrue(success);
+
+ //THEN
+ Condition condition = new Condition(null, null, null, null, startTime,
+ endTime, null, true);
+ condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
+ METRICS_AGGREGATE_HOURLY_TABLE_NAME));
+
+ PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
+ (conn, condition);
+ ResultSet rs = pstmt.executeQuery();
+
+ while (rs.next()) {
+ TimelineMetric currentMetric =
+ PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
+ AbstractTimelineAggregator.MetricHostAggregate currentHostAggregate =
+ PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
+
+ if ("disk_used".equals(currentMetric.getMetricName())) {
+ assertEquals(2.0, currentHostAggregate.getMax());
+ assertEquals(0.0, currentHostAggregate.getMin());
+ assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples());
+ assertEquals(12 * 15.0, currentHostAggregate.getSum());
+ assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+ }
+ }
+ }
+
+ private TimelineMetric createEmptyTimelineMetric(long startTime) {
+ TimelineMetric metric = new TimelineMetric();
+ metric.setMetricName("disk_used");
+ metric.setAppId("test_app");
+ metric.setHostName("test_host");
+ metric.setTimestamp(startTime);
+
+ return metric;
+ }
+
+ private AbstractTimelineAggregator.MetricHostAggregate
+ createMetricHostAggregate(double max, double min, int numberOfSamples,
+ double sum) {
+ AbstractTimelineAggregator.MetricHostAggregate expectedAggregate =
+ new AbstractTimelineAggregator.MetricHostAggregate();
+ expectedAggregate.setMax(max);
+ expectedAggregate.setMin(min);
+ expectedAggregate.setNumberOfSamples(numberOfSamples);
+ expectedAggregate.setSum(sum);
+
+ return expectedAggregate;
+ }
+
+ private PhoenixHBaseAccessor createTestableHBaseAccessor() {
+ return
+ new PhoenixHBaseAccessor(
+ new Configuration(),
+ new Configuration(),
+ new ConnectionProvider() {
+ @Override
+ public Connection getConnection() {
+ Connection connection = null;
+ try {
+ connection = DriverManager.getConnection(getUrl());
+ } catch (SQLException e) {
+ LOG.warn("Unable to connect to HBase store using Phoenix.", e);
+ }
+ return connection;
+ }
+ });
+ }
+
+ private final static Comparator<TimelineMetric> TIME_IGNORING_COMPARATOR =
+ new Comparator<TimelineMetric>() {
+ @Override
+ public int compare(TimelineMetric o1, TimelineMetric o2) {
+ return o1.equalsExceptTime(o2) ? 0 : 1;
+ }
+ };
+
+ private TimelineMetrics prepareTimelineMetrics(long startTime) {
+ TimelineMetrics metrics = new TimelineMetrics();
+ List<TimelineMetric> allMetrics = new ArrayList<TimelineMetric>();
+ allMetrics.add(createMetric(startTime, "disk_free"));
+ allMetrics.add(createMetric(startTime, "mem_free"));
+ metrics.setMetrics(allMetrics);
+
+ return metrics;
+ }
+
+ private TimelineMetric createMetric(long startTime, String metricName) {
+ TimelineMetric m = new TimelineMetric();
+ m.setAppId("host");
+ m.setHostName("local");
+ m.setMetricName(metricName);
+ m.setStartTime(startTime);
+ Map<Long, Double> vals = new HashMap<Long, Double>();
+ vals.put(startTime + 15000l, 0.0);
+ vals.put(startTime + 30000l, 0.0);
+ vals.put(startTime + 45000l, 1.0);
+ vals.put(startTime + 60000l, 2.0);
+
+ m.setMetricValues(vals);
+
+ return m;
+ }
+
+ protected static String getUrl() {
+ return MY_LOCAL_URL;
+// return TestUtil.PHOENIX_CONNECTIONLESS_JDBC_URL;
+ }
+
+ private static Connection getConnection(String url) throws SQLException {
+ return DriverManager.getConnection(url);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/e5542aa5/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
new file mode 100644
index 0000000..aebbdb3
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
@@ -0,0 +1,50 @@
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline;
+
+import org.junit.Test;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.AbstractTimelineAggregator.MetricHostAggregate;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class TestMetricHostAggregate {
+
+ @Test
+ public void testCreateAggregate() throws Exception {
+ // given
+ MetricHostAggregate aggregate = createAggregate(3.0, 1.0, 2.0, 2);
+
+ //then
+ assertThat(aggregate.getSum()).isEqualTo(3.0);
+ assertThat(aggregate.getMin()).isEqualTo(1.0);
+ assertThat(aggregate.getMax()).isEqualTo(2.0);
+ assertThat(aggregate.getAvg()).isEqualTo(3.0 / 2);
+ }
+
+ @Test
+ public void testUpdateAggregates() throws Exception {
+ // given
+ MetricHostAggregate aggregate = createAggregate(3.0, 1.0, 2.0, 2);
+
+ //when
+ aggregate.updateAggregates(createAggregate(8.0, 0.5, 7.5, 2));
+ aggregate.updateAggregates(createAggregate(1.0, 1.0, 1.0, 1));
+
+ //then
+ assertThat(aggregate.getSum()).isEqualTo(12.0);
+ assertThat(aggregate.getMin()).isEqualTo(0.5);
+ assertThat(aggregate.getMax()).isEqualTo(7.5);
+ assertThat(aggregate.getAvg()).isEqualTo((3.0 + 8.0 + 1.0) / 5);
+ }
+
+ private MetricHostAggregate createAggregate
+ (double sum, double min, double max, int samplesCount) {
+ MetricHostAggregate aggregate = new MetricHostAggregate();
+ aggregate.setSum(sum);
+ aggregate.setMax(max);
+ aggregate.setMin(min);
+ aggregate.setDeviation(0.0);
+ aggregate.setNumberOfSamples(samplesCount);
+ return aggregate;
+ }
+}
\ No newline at end of file