You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2014/12/23 23:58:48 UTC
ambari git commit: Revert "AMBARI-8872. Support point in time
queries. Breaks dashboard graphs."
Repository: ambari
Updated Branches:
refs/heads/trunk 70f1170b0 -> 102b47736
Revert "AMBARI-8872. Support point in time queries. Breaks dashboard graphs."
This reverts commit 9bf9034a5c2481a8b40befab8c3713dcd3b6f584.
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/102b4773
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/102b4773
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/102b4773
Branch: refs/heads/trunk
Commit: 102b47736e9f721baa6bac434cd58bfeaf105aff
Parents: 70f1170
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Tue Dec 23 14:43:58 2014 -0800
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Tue Dec 23 14:43:58 2014 -0800
----------------------------------------------------------------------
.../metrics/timeline/PhoenixHBaseAccessor.java | 473 +++++++-----------
.../metrics/timeline/PhoenixTransactSQL.java | 491 +++++--------------
.../timeline/TimelineMetricAggregator.java | 3 +-
.../TimelineMetricClusterAggregator.java | 3 +-
.../TimelineMetricClusterAggregatorHourly.java | 3 +-
.../metrics/timeline/ITClusterAggregator.java | 5 +-
.../metrics/timeline/ITMetricAggregator.java | 7 +-
.../timeline/TestPhoenixTransactSQL.java | 21 +-
.../metrics/timeline/AMSPropertyProvider.java | 76 ++-
.../timeline/AMSPropertyProviderTest.java | 92 +---
10 files changed, 341 insertions(+), 833 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/102b4773/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index 4b04ba9..b5226ee 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -34,7 +35,6 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -48,7 +48,6 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.SplitByMetricNamesCondition;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_ENCODING;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
@@ -135,6 +134,7 @@ public class PhoenixHBaseAccessor {
}
}
+
/**
* Get JDBC connection to HBase store. Assumption is that the hbase
* configuration is present on the classpath and loaded by the caller into
@@ -148,28 +148,13 @@ public class PhoenixHBaseAccessor {
return dataSource.getConnection();
}
- private static TimelineMetric getLastTimelineMetricFromResultSet(ResultSet rs)
- throws SQLException, IOException {
- TimelineMetric metric = getTimelineMetricCommonsFromResultSet(rs);
- metric.setMetricValues(readLastMetricValueFromJSON(rs.getString("METRICS")));
-
- return metric;
+ public static Map readMetricFromJSON(String json) throws IOException {
+ return mapper.readValue(json, metricValuesTypeRef);
}
+ @SuppressWarnings("unchecked")
static TimelineMetric getTimelineMetricFromResultSet(ResultSet rs)
throws SQLException, IOException {
- TimelineMetric metric = getTimelineMetricCommonsFromResultSet(rs);
- Map<Long, Double> sortedByTimeMetrics =
- new TreeMap<Long, Double>(readMetricFromJSON(rs.getString("METRICS")));
- metric.setMetricValues(sortedByTimeMetrics);
- return metric;
- }
-
- /**
- * Returns common part of timeline metrics record without the values.
- */
- private static TimelineMetric getTimelineMetricCommonsFromResultSet(ResultSet rs)
- throws SQLException {
TimelineMetric metric = new TimelineMetric();
metric.setMetricName(rs.getString("METRIC_NAME"));
metric.setAppId(rs.getString("APP_ID"));
@@ -178,23 +163,12 @@ public class PhoenixHBaseAccessor {
metric.setTimestamp(rs.getLong("SERVER_TIME"));
metric.setStartTime(rs.getLong("START_TIME"));
metric.setType(rs.getString("UNITS"));
+ Map<Long, Double> sortedByTimeMetrics =
+ new TreeMap<Long, Double>((Map<Long, Double>) readMetricFromJSON(rs.getString("METRICS")));
+ metric.setMetricValues(sortedByTimeMetrics);
return metric;
}
- private static Map<Long, Double> readLastMetricValueFromJSON(String json)
- throws IOException {
- Map<Long, Double> values = readMetricFromJSON(json);
- Long lastTimeStamp = Collections.max(values.keySet());
-
- return Collections.singletonMap(lastTimeStamp, values.get(lastTimeStamp));
- }
-
- @SuppressWarnings("unchecked")
- public static Map<Long, Double> readMetricFromJSON(String json)
- throws IOException {
- return (Map<Long, Double>) mapper.readValue(json, metricValuesTypeRef);
- }
-
static TimelineMetric getTimelineMetricKeyFromResultSet(ResultSet rs)
throws SQLException, IOException {
TimelineMetric metric = new TimelineMetric();
@@ -343,11 +317,9 @@ public class PhoenixHBaseAccessor {
for (TimelineMetric metric : timelineMetrics) {
metricRecordStmt.clearParameters();
- if (LOG.isTraceEnabled()) {
- LOG.trace("host: " + metric.getHostName() + ", " +
- "metricName = " + metric.getMetricName() + ", " +
- "values: " + metric.getMetricValues());
- }
+ LOG.trace("host: " + metric.getHostName() + ", " +
+ "metricName = " + metric.getMetricName() + ", " +
+ "values: " + metric.getMetricValues());
Aggregator agg = new Aggregator();
double[] aggregates = agg.calculateAggregates(
metric.getMetricValues());
@@ -394,32 +366,31 @@ public class PhoenixHBaseAccessor {
}
}
+
@SuppressWarnings("unchecked")
public TimelineMetrics getMetricRecords(final Condition condition)
throws SQLException, IOException {
- verifyCondition(condition);
+ if (condition.isEmpty()) {
+ throw new SQLException("No filter criteria specified.");
+ }
Connection conn = getConnection();
PreparedStatement stmt = null;
TimelineMetrics metrics = new TimelineMetrics();
try {
- //get latest
- if(condition.isPointInTime()){
- stmt = getLatestMetricRecords(condition, conn, metrics);
- } else {
- stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+ stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
- ResultSet rs = stmt.executeQuery();
- while (rs.next()) {
- TimelineMetric metric = getTimelineMetricFromResultSet(rs);
+ ResultSet rs = stmt.executeQuery();
- if (condition.isGrouped()) {
- metrics.addOrMergeTimelineMetric(metric);
- } else {
- metrics.getMetrics().add(metric);
- }
+ while (rs.next()) {
+ TimelineMetric metric = getTimelineMetricFromResultSet(rs);
+
+ if (condition.isGrouped()) {
+ metrics.addOrMergeTimelineMetric(metric);
+ } else {
+ metrics.getMetrics().add(metric);
}
}
@@ -439,221 +410,174 @@ public class PhoenixHBaseAccessor {
}
}
}
-
- LOG.info("Metrics records size: " + metrics.getMetrics().size());
return metrics;
}
- private PreparedStatement getLatestMetricRecords(
- Condition condition, Connection conn, TimelineMetrics metrics)
- throws SQLException, IOException {
- PreparedStatement stmt = null;
- SplitByMetricNamesCondition splitCondition =
- new SplitByMetricNamesCondition(condition);
-
- for (String metricName: splitCondition.getOriginalMetricNames()) {
- splitCondition.setCurrentMetric(metricName);
- stmt = PhoenixTransactSQL.prepareGetLatestMetricSqlStmt(conn,
- splitCondition);
-
- ResultSet rs = stmt.executeQuery();
- while (rs.next()) {
- TimelineMetric metric = getLastTimelineMetricFromResultSet(rs);
- metrics.getMetrics().add(metric);
- }
- }
+ public void saveHostAggregateRecords(Map<TimelineMetric,
+ MetricHostAggregate> hostAggregateMap, String phoenixTableName)
+ throws SQLException {
- return stmt;
- }
+ if (hostAggregateMap != null && !hostAggregateMap.isEmpty()) {
+ Connection conn = getConnection();
+ PreparedStatement stmt = null;
- /**
- * Get metrics aggregated across hosts.
- *
- * @param condition @Condition
- * @return @TimelineMetrics
- * @throws SQLException
- */
- public TimelineMetrics getAggregateMetricRecords(final Condition condition)
- throws SQLException {
+ long start = System.currentTimeMillis();
+ int rowCount = 0;
- verifyCondition(condition);
+ try {
+ stmt = conn.prepareStatement(
+ String.format(UPSERT_AGGREGATE_RECORD_SQL, phoenixTableName));
+
+ for (Map.Entry<TimelineMetric, MetricHostAggregate> metricAggregate :
+ hostAggregateMap.entrySet()) {
+
+ TimelineMetric metric = metricAggregate.getKey();
+ MetricHostAggregate hostAggregate = metricAggregate.getValue();
+
+ rowCount++;
+ stmt.clearParameters();
+ stmt.setString(1, metric.getMetricName());
+ stmt.setString(2, metric.getHostName());
+ stmt.setString(3, metric.getAppId());
+ stmt.setString(4, metric.getInstanceId());
+ stmt.setLong(5, metric.getTimestamp());
+ stmt.setString(6, metric.getType());
+ stmt.setDouble(7, hostAggregate.getSum());
+ stmt.setDouble(8, hostAggregate.getMax());
+ stmt.setDouble(9, hostAggregate.getMin());
+ stmt.setDouble(10, hostAggregate.getNumberOfSamples());
+
+ try {
+ // TODO: Why this exception is swallowed
+ stmt.executeUpdate();
+ } catch (SQLException sql) {
+ LOG.error(sql);
+ }
- Connection conn = getConnection();
- PreparedStatement stmt = null;
- TimelineMetrics metrics = new TimelineMetrics();
+ if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
+ conn.commit();
+ rowCount = 0;
+ }
- try {
- //get latest
- if(condition.isPointInTime()) {
- stmt = getLatestAggregateMetricRecords(condition, conn, metrics);
- } else {
- stmt = PhoenixTransactSQL.prepareGetAggregateSqlStmt(conn, condition);
+ }
- ResultSet rs = stmt.executeQuery();
- while (rs.next()) {
- TimelineMetric metric = getAggregateTimelineMetricFromResultSet(rs);
+ conn.commit();
- if (condition.isGrouped()) {
- metrics.addOrMergeTimelineMetric(metric);
- } else {
- metrics.getMetrics().add(metric);
+ } finally {
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ // Ignore
}
}
- }
- } finally {
- if (stmt != null) {
- try {
- stmt.close();
- } catch (SQLException e) {
- // Ignore
- }
- }
- if (conn != null) {
- try {
- conn.close();
- } catch (SQLException sql) {
- // Ignore
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException sql) {
+ // Ignore
+ }
}
}
- }
-
- LOG.info("Aggregate records size: " + metrics.getMetrics().size());
- return metrics;
- }
- private PreparedStatement getLatestAggregateMetricRecords(
- Condition condition, Connection conn, TimelineMetrics metrics)
- throws SQLException {
+ long end = System.currentTimeMillis();
- PreparedStatement stmt = null;
- SplitByMetricNamesCondition splitCondition =
- new SplitByMetricNamesCondition(condition);
-
- for (String metricName: splitCondition.getOriginalMetricNames()) {
-
- splitCondition.setCurrentMetric(metricName);
- stmt = PhoenixTransactSQL.prepareGetLatestAggregateMetricSqlStmt(conn,
- splitCondition);
-
- ResultSet rs = stmt.executeQuery();
- while (rs.next()) {
- TimelineMetric metric = getAggregateTimelineMetricFromResultSet(rs);
- metrics.getMetrics().add(metric);
+ if ((end - start) > 60000l) {
+ LOG.info("Time to save map: " + (end - start) + ", " +
+ "thread = " + Thread.currentThread().getClass());
}
}
-
- return stmt;
- }
-
- private TimelineMetric getAggregateTimelineMetricFromResultSet(
- ResultSet rs) throws SQLException {
- TimelineMetric metric = new TimelineMetric();
- metric.setMetricName(rs.getString("METRIC_NAME"));
- metric.setAppId(rs.getString("APP_ID"));
- metric.setInstanceId(rs.getString("INSTANCE_ID"));
- metric.setTimestamp(rs.getLong("SERVER_TIME"));
- metric.setStartTime(rs.getLong("SERVER_TIME"));
- Map<Long, Double> valueMap = Collections.singletonMap(
- rs.getLong("SERVER_TIME"),
- rs.getDouble("METRIC_SUM") / rs.getInt("HOSTS_COUNT"));
- metric.setMetricValues(valueMap);
-
- return metric;
- }
-
- private void verifyCondition(Condition condition) throws SQLException {
- if (condition.isEmpty()) {
- throw new SQLException("No filter criteria specified.");
- }
}
- public void saveHostAggregateRecords(Map<TimelineMetric,
- MetricHostAggregate> hostAggregateMap, String phoenixTableName)
+ /**
+ * Save Metric aggregate records.
+ *
+ * @throws SQLException
+ */
+ public void saveClusterAggregateRecords(
+ Map<TimelineClusterMetric, MetricClusterAggregate> records)
throws SQLException {
- if (hostAggregateMap == null || hostAggregateMap.isEmpty()) {
- LOG.debug("Empty aggregate records.");
- return;
- }
-
- Connection conn = getConnection();
- PreparedStatement stmt = null;
-
- long start = System.currentTimeMillis();
- int rowCount = 0;
+ if (records == null || records.isEmpty()) {
+ LOG.debug("Empty aggregate records.");
+ return;
+ }
- try {
- stmt = conn.prepareStatement(
- String.format(UPSERT_AGGREGATE_RECORD_SQL, phoenixTableName));
+ long start = System.currentTimeMillis();
- for (Map.Entry<TimelineMetric, MetricHostAggregate> metricAggregate :
- hostAggregateMap.entrySet()) {
+ Connection conn = getConnection();
+ PreparedStatement stmt = null;
+ try {
+ stmt = conn.prepareStatement(UPSERT_CLUSTER_AGGREGATE_SQL);
+ int rowCount = 0;
- TimelineMetric metric = metricAggregate.getKey();
- MetricHostAggregate hostAggregate = metricAggregate.getValue();
+ for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate>
+ aggregateEntry : records.entrySet()) {
+ TimelineClusterMetric clusterMetric = aggregateEntry.getKey();
+ MetricClusterAggregate aggregate = aggregateEntry.getValue();
- rowCount++;
- stmt.clearParameters();
- stmt.setString(1, metric.getMetricName());
- stmt.setString(2, metric.getHostName());
- stmt.setString(3, metric.getAppId());
- stmt.setString(4, metric.getInstanceId());
- stmt.setLong(5, metric.getTimestamp());
- stmt.setString(6, metric.getType());
- stmt.setDouble(7, hostAggregate.getSum());
- stmt.setDouble(8, hostAggregate.getMax());
- stmt.setDouble(9, hostAggregate.getMin());
- stmt.setDouble(10, hostAggregate.getNumberOfSamples());
+ LOG.trace("clusterMetric = " + clusterMetric + ", " +
+ "aggregate = " + aggregate);
- try {
- stmt.executeUpdate();
- } catch (SQLException sql) {
- LOG.error(sql);
- }
+ rowCount++;
+ stmt.clearParameters();
+ stmt.setString(1, clusterMetric.getMetricName());
+ stmt.setString(2, clusterMetric.getAppId());
+ stmt.setString(3, clusterMetric.getInstanceId());
+ stmt.setLong(4, clusterMetric.getTimestamp());
+ stmt.setString(5, clusterMetric.getType());
+ stmt.setDouble(6, aggregate.getSum());
+ stmt.setInt(7, aggregate.getNumberOfHosts());
+ stmt.setDouble(8, aggregate.getMax());
+ stmt.setDouble(9, aggregate.getMin());
+
+ try {
+ stmt.executeUpdate();
+ } catch (SQLException sql) {
+ // TODO: Why this exception is swallowed
+ LOG.error(sql);
+ }
- if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
- conn.commit();
- rowCount = 0;
+ if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
+ conn.commit();
+ rowCount = 0;
+ }
}
- }
-
- conn.commit();
+ conn.commit();
- } finally {
- if (stmt != null) {
- try {
- stmt.close();
- } catch (SQLException e) {
- // Ignore
+ } finally {
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
}
- }
- if (conn != null) {
- try {
- conn.close();
- } catch (SQLException sql) {
- // Ignore
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException sql) {
+ // Ignore
+ }
}
}
+ long end = System.currentTimeMillis();
+ if ((end - start) > 60000l) {
+ LOG.info("Time to save: " + (end - start) + ", " +
+ "thread = " + Thread.currentThread().getName());
+ }
}
- long end = System.currentTimeMillis();
-
- if ((end - start) > 60000l) {
- LOG.info("Time to save map: " + (end - start) + ", " +
- "thread = " + Thread.currentThread().getClass());
- }
- }
-
/**
* Save Metric aggregate records.
*
* @throws SQLException
*/
- public void saveClusterAggregateRecords(
- Map<TimelineClusterMetric, MetricClusterAggregate> records)
+ public void saveClusterAggregateHourlyRecords(
+ Map<TimelineClusterMetric, MetricHostAggregate> records,
+ String tableName)
throws SQLException {
-
if (records == null || records.isEmpty()) {
LOG.debug("Empty aggregate records.");
return;
@@ -664,18 +588,17 @@ public class PhoenixHBaseAccessor {
Connection conn = getConnection();
PreparedStatement stmt = null;
try {
- stmt = conn.prepareStatement(UPSERT_CLUSTER_AGGREGATE_SQL);
+ stmt = conn.prepareStatement(String.format
+ (UPSERT_CLUSTER_AGGREGATE_TIME_SQL, tableName));
int rowCount = 0;
- for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate>
+ for (Map.Entry<TimelineClusterMetric, MetricHostAggregate>
aggregateEntry : records.entrySet()) {
TimelineClusterMetric clusterMetric = aggregateEntry.getKey();
- MetricClusterAggregate aggregate = aggregateEntry.getValue();
+ MetricHostAggregate aggregate = aggregateEntry.getValue();
- if (LOG.isTraceEnabled()) {
- LOG.trace("clusterMetric = " + clusterMetric + ", " +
- "aggregate = " + aggregate);
- }
+ LOG.trace("clusterMetric = " + clusterMetric + ", " +
+ "aggregate = " + aggregate);
rowCount++;
stmt.clearParameters();
@@ -685,7 +608,8 @@ public class PhoenixHBaseAccessor {
stmt.setLong(4, clusterMetric.getTimestamp());
stmt.setString(5, clusterMetric.getType());
stmt.setDouble(6, aggregate.getSum());
- stmt.setInt(7, aggregate.getNumberOfHosts());
+// stmt.setInt(7, aggregate.getNumberOfHosts());
+ stmt.setLong(7, aggregate.getNumberOfSamples());
stmt.setDouble(8, aggregate.getMax());
stmt.setDouble(9, aggregate.getMin());
@@ -727,68 +651,48 @@ public class PhoenixHBaseAccessor {
}
}
-
/**
- * Save Metric aggregate records.
+ * Get metrics aggregated across hosts.
*
+ * @param condition @Condition
+ * @return @TimelineMetrics
* @throws SQLException
*/
- public void saveClusterAggregateHourlyRecords(
- Map<TimelineClusterMetric, MetricHostAggregate> records,
- String tableName)
+ public TimelineMetrics getAggregateMetricRecords(final Condition condition)
throws SQLException {
- if (records == null || records.isEmpty()) {
- LOG.debug("Empty aggregate records.");
- return;
- }
- long start = System.currentTimeMillis();
+ if (condition.isEmpty()) {
+ throw new SQLException("No filter criteria specified.");
+ }
Connection conn = getConnection();
PreparedStatement stmt = null;
- try {
- stmt = conn.prepareStatement(String.format
- (UPSERT_CLUSTER_AGGREGATE_TIME_SQL, tableName));
- int rowCount = 0;
-
- for (Map.Entry<TimelineClusterMetric, MetricHostAggregate>
- aggregateEntry : records.entrySet()) {
- TimelineClusterMetric clusterMetric = aggregateEntry.getKey();
- MetricHostAggregate aggregate = aggregateEntry.getValue();
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("clusterMetric = " + clusterMetric + ", " +
- "aggregate = " + aggregate);
- }
+ TimelineMetrics metrics = new TimelineMetrics();
- rowCount++;
- stmt.clearParameters();
- stmt.setString(1, clusterMetric.getMetricName());
- stmt.setString(2, clusterMetric.getAppId());
- stmt.setString(3, clusterMetric.getInstanceId());
- stmt.setLong(4, clusterMetric.getTimestamp());
- stmt.setString(5, clusterMetric.getType());
- stmt.setDouble(6, aggregate.getSum());
-// stmt.setInt(7, aggregate.getNumberOfHosts());
- stmt.setLong(7, aggregate.getNumberOfSamples());
- stmt.setDouble(8, aggregate.getMax());
- stmt.setDouble(9, aggregate.getMin());
+ try {
+ stmt = PhoenixTransactSQL.prepareGetAggregateSqlStmt(conn, condition);
- try {
- stmt.executeUpdate();
- } catch (SQLException sql) {
- // we have no way to verify it works!!!
- LOG.error(sql);
- }
+ ResultSet rs = stmt.executeQuery();
- if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
- conn.commit();
- rowCount = 0;
+ while (rs.next()) {
+ TimelineMetric metric = new TimelineMetric();
+ metric.setMetricName(rs.getString("METRIC_NAME"));
+ metric.setAppId(rs.getString("APP_ID"));
+ metric.setInstanceId(rs.getString("INSTANCE_ID"));
+ metric.setTimestamp(rs.getLong("SERVER_TIME"));
+ metric.setStartTime(rs.getLong("SERVER_TIME"));
+ Map<Long, Double> valueMap = new HashMap<Long, Double>();
+ valueMap.put(rs.getLong("SERVER_TIME"),
+ rs.getDouble("METRIC_SUM") / rs.getInt("HOSTS_COUNT"));
+ metric.setMetricValues(valueMap);
+
+ if (condition.isGrouped()) {
+ metrics.addOrMergeTimelineMetric(metric);
+ } else {
+ metrics.getMetrics().add(metric);
}
}
- conn.commit();
-
} finally {
if (stmt != null) {
try {
@@ -805,10 +709,7 @@ public class PhoenixHBaseAccessor {
}
}
}
- long end = System.currentTimeMillis();
- if ((end - start) > 60000l) {
- LOG.info("Time to save: " + (end - start) + ", " +
- "thread = " + Thread.currentThread().getName());
- }
+ LOG.info("Aggregate records size: " + metrics.getMetrics().size());
+ return metrics;
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/102b4773/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
index 421d533..fb02dc7 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
@@ -17,13 +17,11 @@
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-import com.sun.xml.bind.v2.util.QNameMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
-import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
@@ -34,6 +32,7 @@ import java.util.Set;
public class PhoenixTransactSQL {
static final Log LOG = LogFactory.getLog(PhoenixTransactSQL.class);
+ // TODO: Configurable TTL values
/**
* Create table to store individual metric records.
*/
@@ -207,10 +206,8 @@ public class PhoenixTransactSQL {
public static final String DEFAULT_ENCODING = "FAST_DIFF";
public static final long NATIVE_TIME_RANGE_DELTA = 120000; // 2 minutes
- /**
- * Filter to optimize HBase scan by using file timestamps. This prevents
+ /** Filter to optimize HBase scan by using file timestamps. This prevents
* a full table scan of metric records.
- *
* @return Phoenix Hint String
*/
public static String getNaiveTimeRangeHint(Long startTime, Long delta) {
@@ -246,47 +243,33 @@ public class PhoenixTransactSQL {
sb.append(" LIMIT ").append(condition.getLimit());
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("SQL: " + sb.toString() + ", condition: " + condition);
- }
+ LOG.debug("SQL: " + sb.toString() + ", condition: " + condition);
PreparedStatement stmt = connection.prepareStatement(sb.toString());
int pos = 1;
if (condition.getMetricNames() != null) {
for (; pos <= condition.getMetricNames().size(); pos++) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Setting pos: " + pos + ", value = " + condition.getMetricNames().get(pos - 1));
- }
+ LOG.debug("Setting pos: " + pos + ", value = " + condition.getMetricNames().get(pos - 1));
stmt.setString(pos, condition.getMetricNames().get(pos - 1));
}
}
if (condition.getHostname() != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Setting pos: " + pos + ", value: " + condition.getHostname());
- }
+ LOG.debug("Setting pos: " + pos + ", value: " + condition.getHostname());
stmt.setString(pos++, condition.getHostname());
}
if (condition.getAppId() != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId());
- }
+ LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId());
stmt.setString(pos++, condition.getAppId());
}
if (condition.getInstanceId() != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Setting pos: " + pos + ", value: " + condition.getInstanceId());
- }
+ LOG.debug("Setting pos: " + pos + ", value: " + condition.getInstanceId());
stmt.setString(pos++, condition.getInstanceId());
}
if (condition.getStartTime() != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Setting pos: " + pos + ", value: " + condition.getStartTime());
- }
+ LOG.debug("Setting pos: " + pos + ", value: " + condition.getStartTime());
stmt.setLong(pos++, condition.getStartTime());
}
if (condition.getEndTime() != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Setting pos: " + pos + ", value: " + condition.getEndTime());
- }
+ LOG.debug("Setting pos: " + pos + ", value: " + condition.getEndTime());
stmt.setLong(pos, condition.getEndTime());
}
if (condition.getFetchSize() != null) {
@@ -297,80 +280,6 @@ public class PhoenixTransactSQL {
}
- public static PreparedStatement prepareGetLatestMetricSqlStmt(
- Connection connection, Condition condition) throws SQLException {
-
- if (condition.isEmpty()) {
- throw new IllegalArgumentException("Condition is empty.");
- }
-
- if (condition.getMetricNames() == null
- || condition.getMetricNames().size() == 0) {
- throw new IllegalArgumentException("Point in time query without " +
- "metric names not supported ");
- }
-
- String stmtStr;
- if (condition.getStatement() != null) {
- stmtStr = condition.getStatement();
- } else {
- stmtStr = String.format(GET_METRIC_SQL,
- "",
- METRICS_RECORD_TABLE_NAME);
- }
-
- StringBuilder sb = new StringBuilder(stmtStr);
- sb.append(" WHERE ");
- sb.append(condition.getConditionClause());
- String orderByClause = condition.getOrderByClause();
- if (orderByClause != null) {
- sb.append(orderByClause);
- } else {
- sb.append(" ORDER BY SERVER_TIME DESC, METRIC_NAME ");
- }
-
- sb.append(" LIMIT ").append(condition.getMetricNames().size());
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("SQL: " + sb.toString() + ", condition: " + condition);
- }
- PreparedStatement stmt = connection.prepareStatement(sb.toString());
- int pos = 1;
- if (condition.getMetricNames() != null) {
- //IGNORE condition limit, set one based on number of metric names
- for (; pos <= condition.getMetricNames().size(); pos++) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Setting pos: " + pos + ", value = " + condition.getMetricNames().get(pos - 1));
- }
- stmt.setString(pos, condition.getMetricNames().get(pos - 1));
- }
- }
- if (condition.getHostname() != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Setting pos: " + pos + ", value: " + condition.getHostname());
- }
- stmt.setString(pos++, condition.getHostname());
- }
- if (condition.getAppId() != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId());
- }
- stmt.setString(pos++, condition.getAppId());
- }
- if (condition.getInstanceId() != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Setting pos: " + pos + ", value: " + condition.getInstanceId());
- }
- stmt.setString(pos++, condition.getInstanceId());
- }
-
- if (condition.getFetchSize() != null) {
- stmt.setFetchSize(condition.getFetchSize());
- }
-
- return stmt;
- }
-
public static PreparedStatement prepareGetAggregateSqlStmt(
Connection connection, Condition condition) throws SQLException {
@@ -389,9 +298,7 @@ public class PhoenixTransactSQL {
String query = String.format(sb.toString(),
PhoenixTransactSQL.getNaiveTimeRangeHint(condition.getStartTime(),
NATIVE_TIME_RANGE_DELTA));
- if (LOG.isDebugEnabled()) {
- LOG.debug("SQL => " + query + ", condition => " + condition);
- }
+ LOG.debug("SQL => " + query + ", condition => " + condition);
PreparedStatement stmt = connection.prepareStatement(query);
int pos = 1;
if (condition.getMetricNames() != null) {
@@ -416,87 +323,7 @@ public class PhoenixTransactSQL {
return stmt;
}
- public static PreparedStatement prepareGetLatestAggregateMetricSqlStmt(
- Connection connection, Condition condition) throws SQLException {
-
- if (condition.isEmpty()) {
- throw new IllegalArgumentException("Condition is empty.");
- }
-
- if (condition.getMetricNames() == null
- || condition.getMetricNames().size() == 0) {
- throw new IllegalArgumentException("Point in time query without " +
- "metric names not supported ");
- }
-
- String stmtStr;
- if (condition.getStatement() != null) {
- stmtStr = condition.getStatement();
- } else {
- stmtStr = String.format(GET_CLUSTER_AGGREGATE_SQL, "");
- }
-
- StringBuilder sb = new StringBuilder(stmtStr);
- sb.append(" WHERE ");
- sb.append(condition.getConditionClause());
- String orderByClause = condition.getOrderByClause();
- if (orderByClause != null) {
- sb.append(orderByClause);
- } else {
- sb.append(" ORDER BY SERVER_TIME DESC, METRIC_NAME ");
- }
-
- sb.append(" LIMIT ").append(condition.getMetricNames().size());
-
- String query = sb.toString();
- if (LOG.isDebugEnabled()) {
- LOG.debug("SQL: " + query + ", condition: " + condition);
- }
-
- PreparedStatement stmt = connection.prepareStatement(query);
- int pos = 1;
- if (condition.getMetricNames() != null) {
- for (; pos <= condition.getMetricNames().size(); pos++) {
- stmt.setString(pos, condition.getMetricNames().get(pos - 1));
- }
- }
- if (condition.getAppId() != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId());
- }
- stmt.setString(pos++, condition.getAppId());
- }
- if (condition.getInstanceId() != null) {
- stmt.setString(pos++, condition.getInstanceId());
- }
-
- return stmt;
- }
-
- static interface Condition {
-
- boolean isEmpty();
-
- List<String> getMetricNames();
- boolean isPointInTime();
- boolean isGrouped();
- void setStatement(String statement);
- String getHostname();
- String getAppId();
- String getInstanceId();
- String getConditionClause();
- String getOrderByClause();
- String getStatement();
- Long getStartTime();
- Long getEndTime();
- Integer getLimit();
- Integer getFetchSize();
- void setFetchSize(Integer fetchSize);
- void addOrderByColumn(String column);
- void setNoLimit();
- }
-
- static class DefaultCondition implements Condition {
+ static class Condition {
List<String> metricNames;
String hostname;
String appId;
@@ -510,7 +337,7 @@ public class PhoenixTransactSQL {
String statement;
Set<String> orderByColumns = new LinkedHashSet<String>();
- DefaultCondition(List<String> metricNames, String hostname, String appId,
+ Condition(List<String> metricNames, String hostname, String appId,
String instanceId, Long startTime, Long endTime, Integer limit,
boolean grouped) {
this.metricNames = metricNames;
@@ -523,22 +350,22 @@ public class PhoenixTransactSQL {
this.grouped = grouped;
}
- public String getStatement() {
+ String getStatement() {
return statement;
}
- public void setStatement(String statement) {
+ void setStatement(String statement) {
this.statement = statement;
}
- public List<String> getMetricNames() {
+ List<String> getMetricNames() {
return metricNames == null || metricNames.isEmpty() ? null : metricNames;
}
String getMetricsClause() {
StringBuilder sb = new StringBuilder("(");
if (metricNames != null) {
- for (String name : getMetricNames()) {
+ for (String name : metricNames) {
if (sb.length() != 1) {
sb.append(", ");
}
@@ -551,48 +378,61 @@ public class PhoenixTransactSQL {
}
}
- public String getConditionClause() {
+ String getConditionClause() {
StringBuilder sb = new StringBuilder();
boolean appendConjunction = false;
if (getMetricNames() != null) {
- if (appendConjunction) {
- sb.append(" AND");
- }
-
sb.append("METRIC_NAME IN ");
sb.append(getMetricsClause());
appendConjunction = true;
}
-
- appendConjunction = append(sb, appendConjunction, getHostname(), " HOSTNAME = ?");
- appendConjunction = append(sb, appendConjunction, getAppId(), " APP_ID = ?");
- appendConjunction = append(sb, appendConjunction, getInstanceId(), " INSTANCE_ID = ?");
- appendConjunction = append(sb, appendConjunction, getStartTime(), " SERVER_TIME >= ?");
- append(sb, appendConjunction, getEndTime(), " SERVER_TIME < ?");
-
- return sb.toString();
- }
-
- protected static boolean append(StringBuilder sb,
- boolean appendConjunction,
- Object value, String str) {
- if (value != null) {
- if (appendConjunction) {
- sb.append(" AND");
- }
-
- sb.append(str);
+ if (appendConjunction) {
+ sb.append(" AND");
+ }
+ appendConjunction = false;
+ if (getHostname() != null) {
+ sb.append(" HOSTNAME = ?");
appendConjunction = true;
}
- return appendConjunction;
+ if (appendConjunction) {
+ sb.append(" AND");
+ }
+ appendConjunction = false;
+ if (getAppId() != null) {
+ sb.append(" APP_ID = ?");
+ appendConjunction = true;
+ }
+ if (appendConjunction) {
+ sb.append(" AND");
+ }
+ appendConjunction = false;
+ if (getInstanceId() != null) {
+ sb.append(" INSTANCE_ID = ?");
+ appendConjunction = true;
+ }
+ if (appendConjunction) {
+ sb.append(" AND");
+ }
+ appendConjunction = false;
+ if (getStartTime() != null) {
+ sb.append(" SERVER_TIME >= ?");
+ appendConjunction = true;
+ }
+ if (appendConjunction) {
+ sb.append(" AND");
+ }
+ if (getEndTime() != null) {
+ sb.append(" SERVER_TIME < ?");
+ }
+ return sb.toString();
}
- public String getHostname() {
+ String getHostname() {
return hostname == null || hostname.isEmpty() ? null : hostname;
}
- public String getAppId() {
+ String getAppId() {
if (appId != null && !appId.isEmpty()) {
if (!appId.equals("HOST")) {
return appId.toLowerCase();
@@ -603,27 +443,22 @@ public class PhoenixTransactSQL {
return null;
}
- public String getInstanceId() {
+ String getInstanceId() {
return instanceId == null || instanceId.isEmpty() ? null : instanceId;
}
/**
* Convert to millis.
*/
- public Long getStartTime() {
- if (startTime == null) {
- return null;
- } else if (startTime < 9999999999l) {
+ Long getStartTime() {
+ if (startTime < 9999999999l) {
return startTime * 1000;
} else {
return startTime;
}
}
- public Long getEndTime() {
- if (endTime == null) {
- return null;
- }
+ Long getEndTime() {
if (endTime < 9999999999l) {
return endTime * 1000;
} else {
@@ -631,26 +466,22 @@ public class PhoenixTransactSQL {
}
}
- public void setNoLimit() {
+ void setNoLimit() {
this.noLimit = true;
}
- public Integer getLimit() {
+ Integer getLimit() {
if (noLimit) {
return null;
}
return limit == null ? PhoenixHBaseAccessor.RESULTSET_LIMIT : limit;
}
- public boolean isGrouped() {
+ boolean isGrouped() {
return grouped;
}
- public boolean isPointInTime() {
- return getStartTime() == null && getEndTime() == null;
- }
-
- public boolean isEmpty() {
+ boolean isEmpty() {
return (metricNames == null || metricNames.isEmpty())
&& (hostname == null || hostname.isEmpty())
&& (appId == null || appId.isEmpty())
@@ -659,19 +490,19 @@ public class PhoenixTransactSQL {
&& endTime == null;
}
- public Integer getFetchSize() {
+ Integer getFetchSize() {
return fetchSize;
}
- public void setFetchSize(Integer fetchSize) {
+ void setFetchSize(Integer fetchSize) {
this.fetchSize = fetchSize;
}
- public void addOrderByColumn(String column) {
+ void addOrderByColumn(String column) {
orderByColumns.add(column);
}
- public String getOrderByClause() {
+ String getOrderByClause() {
String orderByStr = " ORDER BY ";
if (!orderByColumns.isEmpty()) {
StringBuilder sb = new StringBuilder(orderByStr);
@@ -704,172 +535,70 @@ public class PhoenixTransactSQL {
}
}
- static class LikeCondition extends DefaultCondition {
+ static class LikeCondition extends Condition {
LikeCondition(List<String> metricNames, String hostname,
- String appId, String instanceId, Long startTime,
- Long endTime, Integer limit, boolean grouped) {
+ String appId, String instanceId, Long startTime,
+ Long endTime, Integer limit, boolean grouped) {
super(metricNames, hostname, appId, instanceId, startTime, endTime,
- limit, grouped);
+ limit, grouped);
}
@Override
- public String getConditionClause() {
+ String getConditionClause() {
StringBuilder sb = new StringBuilder();
boolean appendConjunction = false;
if (getMetricNames() != null) {
sb.append("(");
- for (String name : getMetricNames()) {
+ for (String name : metricNames) {
if (sb.length() > 1) {
sb.append(" OR ");
}
sb.append("METRIC_NAME LIKE ?");
}
-
sb.append(")");
appendConjunction = true;
}
-
- appendConjunction = append(sb, appendConjunction, getHostname(), " HOSTNAME = ?");
- appendConjunction = append(sb, appendConjunction, getAppId(), " APP_ID = ?");
- appendConjunction = append(sb, appendConjunction, getInstanceId(), " INSTANCE_ID = ?");
- appendConjunction = append(sb, appendConjunction, getStartTime(), " SERVER_TIME >= ?");
- append(sb, appendConjunction, getEndTime(), " SERVER_TIME < ?");
-
- return sb.toString();
- }
- }
-
- static class SplitByMetricNamesCondition implements Condition {
- private final Condition adaptee;
- private String currentMetric;
-
- SplitByMetricNamesCondition(Condition condition){
- this.adaptee = condition;
- }
-
- @Override
- public boolean isEmpty() {
- return adaptee.isEmpty();
- }
-
- @Override
- public List<String> getMetricNames() {
- return Collections.singletonList(currentMetric);
- }
-
- @Override
- public boolean isPointInTime() {
- return adaptee.isPointInTime();
- }
-
- @Override
- public boolean isGrouped() {
- return adaptee.isGrouped();
- }
-
- @Override
- public void setStatement(String statement) {
- adaptee.setStatement(statement);
- }
-
- @Override
- public String getHostname() {
- return adaptee.getHostname();
- }
-
- @Override
- public String getAppId() {
- return adaptee.getAppId();
- }
-
- @Override
- public String getInstanceId() {
- return adaptee.getInstanceId();
- }
-
- @Override
- public String getConditionClause() {
- StringBuilder sb = new StringBuilder();
- boolean appendConjunction = false;
-
- if (getMetricNames() != null) {
- for (String name : getMetricNames()) {
- if (sb.length() > 1) {
- sb.append(" OR ");
- }
- sb.append("METRIC_NAME = ?");
- }
-
+ if (appendConjunction) {
+ sb.append(" AND");
+ }
+ appendConjunction = false;
+ if (getHostname() != null) {
+ sb.append(" HOSTNAME = ?");
appendConjunction = true;
}
-
- appendConjunction = DefaultCondition.append(sb, appendConjunction,
- getHostname(), " HOSTNAME = ?");
- appendConjunction = DefaultCondition.append(sb, appendConjunction,
- getAppId(), " APP_ID = ?");
- appendConjunction = DefaultCondition.append(sb, appendConjunction,
- getInstanceId(), " INSTANCE_ID = ?");
- appendConjunction = DefaultCondition.append(sb, appendConjunction,
- getStartTime(), " SERVER_TIME >= ?");
- DefaultCondition.append(sb, appendConjunction, getEndTime(),
- " SERVER_TIME < ?");
-
+ if (appendConjunction) {
+ sb.append(" AND");
+ }
+ appendConjunction = false;
+ if (getAppId() != null) {
+ sb.append(" APP_ID = ?");
+ appendConjunction = true;
+ }
+ if (appendConjunction) {
+ sb.append(" AND");
+ }
+ appendConjunction = false;
+ if (getInstanceId() != null) {
+ sb.append(" INSTANCE_ID = ?");
+ appendConjunction = true;
+ }
+ if (appendConjunction) {
+ sb.append(" AND");
+ }
+ appendConjunction = false;
+ if (getStartTime() != null) {
+ sb.append(" SERVER_TIME >= ?");
+ appendConjunction = true;
+ }
+ if (appendConjunction) {
+ sb.append(" AND");
+ }
+ if (getEndTime() != null) {
+ sb.append(" SERVER_TIME < ?");
+ }
return sb.toString();
}
-
- @Override
- public String getOrderByClause() {
- return adaptee.getOrderByClause();
- }
-
- @Override
- public String getStatement() {
- return adaptee.getStatement();
- }
-
- @Override
- public Long getStartTime() {
- return adaptee.getStartTime();
- }
-
- @Override
- public Long getEndTime() {
- return adaptee.getEndTime();
- }
-
- @Override
- public Integer getLimit() {
- return adaptee.getLimit();
- }
-
- @Override
- public Integer getFetchSize() {
- return adaptee.getFetchSize();
- }
-
- @Override
- public void setFetchSize(Integer fetchSize) {
- adaptee.setFetchSize(fetchSize);
- }
-
- @Override
- public void addOrderByColumn(String column) {
- adaptee.addOrderByColumn(column);
- }
-
- @Override
- public void setNoLimit() {
- adaptee.setNoLimit();
- }
-
- public List<String> getOriginalMetricNames() {
- return adaptee.getMetricNames();
- }
-
- public void setCurrentMetric(String currentMetric) {
- this.currentMetric = currentMetric;
- }
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/102b4773/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
index f4f895f..cab154b 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
@@ -27,7 +27,6 @@ import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
public class TimelineMetricAggregator extends AbstractTimelineAggregator {
@@ -79,7 +78,7 @@ public class TimelineMetricAggregator extends AbstractTimelineAggregator {
@Override
protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
- Condition condition = new DefaultCondition(null, null, null, null, startTime,
+ Condition condition = new Condition(null, null, null, null, startTime,
endTime, null, true);
condition.setNoLimit();
condition.setFetchSize(resultsetFetchSize);
http://git-wip-us.apache.org/repos/asf/ambari/blob/102b4773/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
index e291f36..654c188 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
@@ -32,7 +32,6 @@ import java.util.List;
import java.util.Map;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
@@ -92,7 +91,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
@Override
protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
- Condition condition = new DefaultCondition(null, null, null, null, startTime,
+ Condition condition = new Condition(null, null, null, null, startTime,
endTime, null, true);
condition.setNoLimit();
condition.setFetchSize(resultsetFetchSize);
http://git-wip-us.apache.org/repos/asf/ambari/blob/102b4773/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
index 1d5c5a4..7764ea3 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
@@ -31,7 +31,6 @@ import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.getTimelineMetricClusterKeyFromResultSet;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL;
@@ -90,7 +89,7 @@ public class TimelineMetricClusterAggregatorHourly extends
@Override
protected Condition prepareMetricQueryCondition(long startTime,
long endTime) {
- Condition condition = new DefaultCondition(null, null, null, null, startTime,
+ Condition condition = new Condition(null, null, null, null, startTime,
endTime, null, true);
condition.setNoLimit();
condition.setFetchSize(resultsetFetchSize);
http://git-wip-us.apache.org/repos/asf/ambari/blob/102b4773/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
index 2da9c82..f7e53f5 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
@@ -38,7 +38,6 @@ import java.util.Map;
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.fail;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.LOG;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
@@ -96,7 +95,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
boolean success = agg.doWork(startTime, endTime);
//THEN
- Condition condition = new DefaultCondition(null, null, null, null, startTime,
+ Condition condition = new Condition(null, null, null, null, startTime,
endTime, null, true);
condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA)));
@@ -156,7 +155,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
boolean success = agg.doWork(startTime, endTime);
//THEN
- Condition condition = new DefaultCondition(null, null, null, null, startTime,
+ Condition condition = new Condition(null, null, null, null, startTime,
endTime, null, true);
condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA)));
http://git-wip-us.apache.org/repos/asf/ambari/blob/102b4773/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java
index 22e1363..d166a22 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java
@@ -39,7 +39,6 @@ import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertTrue;
import static junit.framework.Assert.fail;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.LOG;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
@@ -85,7 +84,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
TimelineMetrics metricsSent = prepareTimelineMetrics(startTime, "local");
hdb.insertMetricRecords(metricsSent);
- Condition queryCondition = new DefaultCondition(null, "local", null, null,
+ Condition queryCondition = new Condition(null, "local", null, null,
startTime, startTime + (15 * 60 * 1000), null, false);
TimelineMetrics recordRead = hdb.getMetricRecords(queryCondition);
@@ -121,7 +120,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
boolean success = aggregatorMinute.doWork(startTime, endTime);
//THEN
- Condition condition = new DefaultCondition(null, null, null, null, startTime,
+ Condition condition = new Condition(null, null, null, null, startTime,
endTime, null, true);
condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
@@ -200,7 +199,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
assertTrue(success);
//THEN
- Condition condition = new DefaultCondition(null, null, null, null, startTime,
+ Condition condition = new Condition(null, null, null, null, startTime,
endTime, null, true);
condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
http://git-wip-us.apache.org/repos/asf/ambari/blob/102b4773/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java
index 333b13b..1659e46 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java
@@ -24,14 +24,12 @@ import java.util.Arrays;
import java.util.Collections;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.LikeCondition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.SplitByMetricNamesCondition;
public class TestPhoenixTransactSQL {
@Test
public void testConditionClause() throws Exception {
- Condition condition = new DefaultCondition(
+ Condition condition = new Condition(
Arrays.asList("cpu_user", "mem_free"), "h1", "a1", "i1",
1407959718L, 1407959918L, null, false);
@@ -44,23 +42,6 @@ public class TestPhoenixTransactSQL {
}
@Test
- public void testSplitByMetricNamesCondition() throws Exception {
- Condition c = new DefaultCondition(
- Arrays.asList("cpu_user", "mem_free"), "h1", "a1", "i1",
- 1407959718L, 1407959918L, null, false);
-
- SplitByMetricNamesCondition condition = new SplitByMetricNamesCondition(c);
- condition.setCurrentMetric(c.getMetricNames().get(0));
-
- String preparedClause = condition.getConditionClause();
- String expectedClause = "METRIC_NAME = ? AND HOSTNAME = ? AND " +
- "APP_ID = ? AND INSTANCE_ID = ? AND SERVER_TIME >= ? AND SERVER_TIME < ?";
-
- Assert.assertNotNull(preparedClause);
- Assert.assertEquals(expectedClause, preparedClause);
- }
-
- @Test
public void testLikeConditionClause() throws Exception {
Condition condition = new LikeCondition(
Arrays.asList("cpu_user", "mem_free"), "h1", "a1", "i1",
http://git-wip-us.apache.org/repos/asf/ambari/blob/102b4773/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
index e9aac45..585a28c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
@@ -137,8 +137,8 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
*/
public Collection<Resource> populateResources() throws SystemException {
// No open ended query support.
- if (temporalInfo != null && (temporalInfo.getStartTime() == null
- || temporalInfo.getEndTime() == null)) {
+ if (temporalInfo == null || temporalInfo.getStartTime() == null ||
+ temporalInfo.getEndTime() == null) {
return Collections.emptySet();
}
@@ -163,9 +163,38 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
return Collections.emptySet();
}
- String spec = getSpec(hostname, resource);
+ String metricsParam = getSetString(processRegexps(metrics.keySet()), -1);
+ // Reuse uriBuilder
+ uriBuilder.removeQuery();
+
+ if (metricsParam.length() > 0) {
+ uriBuilder.setParameter("metricNames", metricsParam);
+ }
+
+ if (hostname != null && !hostname.isEmpty() && !hostname.equals(dummyHostName)) {
+ uriBuilder.setParameter("hostname", hostname);
+ }
+
+ String componentName = getComponentName(resource);
+ if (componentName != null && !componentName.isEmpty()) {
+ if (TIMELINE_APPID_MAP.containsKey(componentName)) {
+ componentName = TIMELINE_APPID_MAP.get(componentName);
+ }
+ uriBuilder.setParameter("appId", componentName);
+ }
+
+ long startTime = temporalInfo.getStartTime();
+ if (startTime != -1) {
+ uriBuilder.setParameter("startTime", String.valueOf(startTime));
+ }
+
+ long endTime = temporalInfo.getEndTime();
+ if (endTime != -1) {
+ uriBuilder.setParameter("endTime", String.valueOf(endTime));
+ }
BufferedReader reader = null;
+ String spec = uriBuilder.toString();
try {
LOG.debug("Metrics request url =" + spec);
reader = new BufferedReader(new InputStreamReader(streamProvider.readFrom(spec)));
@@ -176,9 +205,8 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
Set<String> patterns = createPatterns(metrics.keySet());
for (TimelineMetric metric : timelineMetrics.getMetrics()) {
- if (metric.getMetricName() != null
- && metric.getMetricValues() != null
- && checkMetricName(patterns, metric.getMetricName())) {
+ if (metric.getMetricName() != null && metric.getMetricValues() != null
+ && checkMetricName(patterns, metric.getMetricName())) {
populateResource(resource, metric);
}
}
@@ -202,42 +230,6 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
return Collections.emptySet();
}
- private String getSpec(String hostname, Resource resource) {
- String metricsParam = getSetString(processRegexps(metrics.keySet()), -1);
- // Reuse uriBuilder
- uriBuilder.removeQuery();
-
- if (metricsParam.length() > 0) {
- uriBuilder.setParameter("metricNames", metricsParam);
- }
-
- if (hostname != null && !hostname.isEmpty() && !hostname.equals(dummyHostName)) {
- uriBuilder.setParameter("hostname", hostname);
- }
-
- String componentName = getComponentName(resource);
- if (componentName != null && !componentName.isEmpty()) {
- if (TIMELINE_APPID_MAP.containsKey(componentName)) {
- componentName = TIMELINE_APPID_MAP.get(componentName);
- }
- uriBuilder.setParameter("appId", componentName);
- }
-
- if (temporalInfo != null) {
- long startTime = temporalInfo.getStartTime();
- if (startTime != -1) {
- uriBuilder.setParameter("startTime", String.valueOf(startTime));
- }
-
- long endTime = temporalInfo.getEndTime();
- if (endTime != -1) {
- uriBuilder.setParameter("endTime", String.valueOf(endTime));
- }
- }
-
- return uriBuilder.toString();
- }
-
private Set<String> createPatterns(Set<String> rawNames) {
Pattern pattern = Pattern.compile(METRIC_REGEXP_PATTERN);
Set<String> result = new HashSet<String>();
http://git-wip-us.apache.org/repos/asf/ambari/blob/102b4773/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java
index 31df3e2..ae1e163 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java
@@ -97,95 +97,6 @@ public class AMSPropertyProviderTest {
}
@Test
- public void testPopulateResourcesForSingleHostMetricPointInTime() throws
- Exception {
-
- // given
- TestStreamProvider streamProvider = new TestStreamProvider(SINGLE_HOST_METRICS_FILE_PATH);
- TestMetricHostProvider metricHostProvider = new TestMetricHostProvider();
- ComponentSSLConfiguration sslConfiguration = mock(ComponentSSLConfiguration.class);
- Map<String, Map<String, PropertyInfo>> propertyIds = PropertyHelper.getMetricPropertyIds(Resource.Type.Host);
- AMSPropertyProvider propertyProvider = new AMSHostPropertyProvider(
- propertyIds,
- streamProvider,
- sslConfiguration,
- metricHostProvider,
- CLUSTER_NAME_PROPERTY_ID,
- HOST_NAME_PROPERTY_ID
- );
-
- Resource resource = new ResourceImpl(Resource.Type.Host);
- resource.setProperty(HOST_NAME_PROPERTY_ID, "h1");
- Map<String, TemporalInfo> temporalInfoMap = Collections.emptyMap();
- Request request = PropertyHelper.getReadRequest(Collections.singleton
- (PROPERTY_ID1), temporalInfoMap);
- System.out.println(request);
-
- // when
- Set<Resource> resources =
- propertyProvider.populateResources(Collections.singleton(resource), request, null);
-
- // then
- Assert.assertEquals(1, resources.size());
- Resource res = resources.iterator().next();
- Map<String, Object> properties = PropertyHelper.getProperties(res);
- Assert.assertNotNull(properties);
- URIBuilder uriBuilder = AMSPropertyProvider.getAMSUriBuilder("localhost", 8188);
- uriBuilder.addParameter("metricNames", "cpu_user");
- uriBuilder.addParameter("hostname", "h1");
- uriBuilder.addParameter("appId", "HOST");
- Assert.assertEquals(uriBuilder.toString(), streamProvider.getLastSpec());
- Double val = (Double) res.getPropertyValue(PROPERTY_ID1);
- Assert.assertEquals(40.45, val, 0.001);
- }
-
- @Test
- public void testPopulateResourcesForMultipleHostMetricscPointInTime() throws Exception {
- TestStreamProvider streamProvider = new TestStreamProvider(MULTIPLE_HOST_METRICS_FILE_PATH);
- TestMetricHostProvider metricHostProvider = new TestMetricHostProvider();
- ComponentSSLConfiguration sslConfiguration = mock(ComponentSSLConfiguration.class);
-
- Map<String, Map<String, PropertyInfo>> propertyIds = PropertyHelper.getMetricPropertyIds(Resource.Type.Host);
- AMSPropertyProvider propertyProvider = new AMSHostPropertyProvider(
- propertyIds,
- streamProvider,
- sslConfiguration,
- metricHostProvider,
- CLUSTER_NAME_PROPERTY_ID,
- HOST_NAME_PROPERTY_ID
- );
-
- Resource resource = new ResourceImpl(Resource.Type.Host);
- resource.setProperty(HOST_NAME_PROPERTY_ID, "h1");
- Map<String, TemporalInfo> temporalInfoMap = Collections.emptyMap();
- Request request = PropertyHelper.getReadRequest(
- new HashSet<String>() {{ add(PROPERTY_ID1); add(PROPERTY_ID2); }}, temporalInfoMap);
- Set<Resource> resources =
- propertyProvider.populateResources(Collections.singleton(resource), request, null);
- Assert.assertEquals(1, resources.size());
- Resource res = resources.iterator().next();
- Map<String, Object> properties = PropertyHelper.getProperties(resources.iterator().next());
- Assert.assertNotNull(properties);
- URIBuilder uriBuilder = AMSPropertyProvider.getAMSUriBuilder("localhost", 8188);
- uriBuilder.addParameter("metricNames", "cpu_user,mem_free");
- uriBuilder.addParameter("hostname", "h1");
- uriBuilder.addParameter("appId", "HOST");
-
- URIBuilder uriBuilder2 = AMSPropertyProvider.getAMSUriBuilder("localhost", 8188);
- uriBuilder2.addParameter("metricNames", "mem_free,cpu_user");
- uriBuilder2.addParameter("hostname", "h1");
- uriBuilder2.addParameter("appId", "HOST");
- System.out.println(streamProvider.getLastSpec());
- Assert.assertTrue(uriBuilder.toString().equals(streamProvider.getLastSpec())
- || uriBuilder2.toString().equals(streamProvider.getLastSpec()));
- Double val1 = (Double) res.getPropertyValue(PROPERTY_ID1);
- Assert.assertEquals(40.45, val1, 0.001);
- Double val2 = (Double)res.getPropertyValue(PROPERTY_ID2);
- Assert.assertEquals(2.47025664E8, val2, 0.1);
- }
-
-
- @Test
public void testPopulateResourcesForMultipleHostMetrics() throws Exception {
TestStreamProvider streamProvider = new TestStreamProvider(MULTIPLE_HOST_METRICS_FILE_PATH);
TestMetricHostProvider metricHostProvider = new TestMetricHostProvider();
@@ -228,14 +139,13 @@ public class AMSPropertyProviderTest {
uriBuilder2.addParameter("startTime", "1416445244701");
uriBuilder2.addParameter("endTime", "1416445244901");
Assert.assertTrue(uriBuilder.toString().equals(streamProvider.getLastSpec())
- || uriBuilder2.toString().equals(streamProvider.getLastSpec()));
+ || uriBuilder2.toString().equals(streamProvider.getLastSpec()));
Number[][] val = (Number[][]) res.getPropertyValue(PROPERTY_ID1);
Assert.assertEquals(111, val.length);
val = (Number[][]) res.getPropertyValue(PROPERTY_ID2);
Assert.assertEquals(86, val.length);
}
-
@Test
public void testPopulateResourcesForRegexpMetrics() throws Exception {
TestStreamProvider streamProvider = new TestStreamProvider(MULTIPLE_COMPONENT_REGEXP_METRICS_FILE_PATH);