You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2017/06/19 18:25:21 UTC
[5/5] ambari git commit: AMBARI-21214 : Use a uuid vs long row key
for metrics in AMS schema. (avijayan)
AMBARI-21214 : Use a uuid vs long row key for metrics in AMS schema. (avijayan)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/041e4e9a
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/041e4e9a
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/041e4e9a
Branch: refs/heads/branch-3.0-ams
Commit: 041e4e9a1a3e6aa36b2bbf3f1b0e86b8d70fc6b8
Parents: 82e6229
Author: Aravindan Vijayan <av...@hortonworks.com>
Authored: Mon Jun 19 10:55:44 2017 -0700
Committer: Aravindan Vijayan <av...@hortonworks.com>
Committed: Mon Jun 19 10:55:44 2017 -0700
----------------------------------------------------------------------
.../timeline/SingleValuedTimelineMetric.java | 9 +-
.../metrics2/sink/timeline/TimelineMetric.java | 8 +
.../sink/timeline/TimelineMetricMetadata.java | 37 +-
.../timeline/HBaseTimelineMetricsService.java | 43 +-
.../metrics/timeline/PhoenixHBaseAccessor.java | 178 +-
.../timeline/TimelineMetricConfiguration.java | 3 +
.../metrics/timeline/TimelineMetricStore.java | 12 +-
.../metrics/timeline/TimelineMetricsFilter.java | 7 -
.../aggregators/AbstractTimelineAggregator.java | 45 +-
.../aggregators/TimelineClusterMetric.java | 6 +-
.../TimelineMetricAggregatorFactory.java | 12 +
.../TimelineMetricAppAggregator.java | 28 +-
.../TimelineMetricClusterAggregator.java | 9 +-
.../TimelineMetricClusterAggregatorSecond.java | 24 +-
.../TimelineMetricHostAggregator.java | 10 +-
.../aggregators/TimelineMetricReadHelper.java | 61 +-
.../discovery/TimelineMetricHostMetadata.java | 51 +
.../discovery/TimelineMetricMetadataKey.java | 26 +-
.../TimelineMetricMetadataManager.java | 290 +++-
.../discovery/TimelineMetricMetadataSync.java | 18 +-
.../metrics/timeline/query/Condition.java | 1 +
.../timeline/query/ConditionBuilder.java | 10 +-
.../timeline/query/DefaultCondition.java | 60 +-
.../metrics/timeline/query/EmptyCondition.java | 5 +
.../timeline/query/PhoenixTransactSQL.java | 277 +--
.../query/SplitByMetricNamesCondition.java | 40 +-
.../metrics/timeline/query/TopNCondition.java | 63 +-
.../timeline/uuid/HashBasedUuidGenStrategy.java | 202 +++
.../timeline/uuid/MetricUuidGenStrategy.java | 49 +
.../timeline/uuid/RandomUuidGenStrategy.java | 53 +
.../webapp/TimelineWebServices.java | 17 +
.../resources/metrics_def/AMBARI_SERVER.dat | 40 +
.../resources/metrics_def/JOBHISTORYSERVER.dat | 58 +
.../main/resources/metrics_def/MASTER_HBASE.dat | 230 ++-
.../main/resources/metrics_def/SLAVE_HBASE.dat | 700 ++++++--
.../timeline/ITPhoenixHBaseAccessor.java | 6 +-
.../metrics/timeline/MetricTestHelper.java | 2 +-
.../timeline/PhoenixHBaseAccessorTest.java | 10 +-
.../timeline/TestPhoenixTransactSQL.java | 105 +-
.../timeline/TestTimelineMetricStore.java | 10 +
.../TimelineMetricsAggregatorMemorySink.java | 4 +-
.../timeline/aggregators/DownSamplerTest.java | 2 +
.../aggregators/ITClusterAggregator.java | 15 +-
.../aggregators/ITMetricAggregator.java | 8 +-
...melineMetricClusterAggregatorSecondTest.java | 65 +-
.../timeline/discovery/TestMetadataManager.java | 173 +-
.../timeline/discovery/TestMetadataSync.java | 32 +-
.../uuid/TimelineMetricUuidManagerTest.java | 184 ++
.../test/resources/test_data/full_whitelist.dat | 1615 ++++++++++++++++++
49 files changed, 4051 insertions(+), 862 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/SingleValuedTimelineMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/SingleValuedTimelineMetric.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/SingleValuedTimelineMetric.java
index 8ecca54..4bb9355 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/SingleValuedTimelineMetric.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/SingleValuedTimelineMetric.java
@@ -30,7 +30,6 @@ public class SingleValuedTimelineMetric {
private String instanceId;
private String hostName;
private Long startTime;
- private String type;
public void setSingleTimeseriesValue(Long timestamp, Double value) {
this.timestamp = timestamp;
@@ -39,14 +38,13 @@ public class SingleValuedTimelineMetric {
public SingleValuedTimelineMetric(String metricName, String appId,
String instanceId, String hostName,
- long timestamp, long startTime, String type) {
+ long timestamp, long startTime) {
this.metricName = metricName;
this.appId = appId;
this.instanceId = instanceId;
this.hostName = hostName;
this.timestamp = timestamp;
this.startTime = startTime;
- this.type = type;
}
public Long getTimestamp() {
@@ -57,10 +55,6 @@ public class SingleValuedTimelineMetric {
return startTime;
}
- public String getType() {
- return type;
- }
-
public Double getValue() {
return value;
}
@@ -97,7 +91,6 @@ public class SingleValuedTimelineMetric {
metric.setMetricName(this.metricName);
metric.setAppId(this.appId);
metric.setHostName(this.hostName);
- metric.setType(this.type);
metric.setInstanceId(this.instanceId);
metric.setStartTime(this.startTime);
metric.setTimestamp(this.timestamp);
http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
index edace52..3d3b19c 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
@@ -52,6 +52,14 @@ public class TimelineMetric implements Comparable<TimelineMetric> {
}
+ // To reconstruct TimelineMetric from UUID.
+ public TimelineMetric(String metricName, String hostname, String appId, String instanceId) {
+ this.metricName = metricName;
+ this.hostName = hostname;
+ this.appId = appId;
+ this.instanceId = instanceId;
+ }
+
// copy constructor
public TimelineMetric(TimelineMetric metric) {
setMetricName(metric.getMetricName());
http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricMetadata.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricMetadata.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricMetadata.java
index 727becc..6c9712f 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricMetadata.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricMetadata.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.metrics2.sink.timeline;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.codehaus.jackson.annotate.JsonIgnore;
@@ -32,6 +34,8 @@ import javax.xml.bind.annotation.XmlRootElement;
public class TimelineMetricMetadata {
private String metricName;
private String appId;
+ private String instanceId;
+ private byte[] uuid;
private String units;
private String type = "UNDEFINED";
private Long seriesStartTime;
@@ -51,11 +55,12 @@ public class TimelineMetricMetadata {
public TimelineMetricMetadata() {
}
- public TimelineMetricMetadata(String metricName, String appId, String units,
+ public TimelineMetricMetadata(String metricName, String appId, String instanceId, String units,
String type, Long seriesStartTime,
boolean supportsAggregates, boolean isWhitelisted) {
this.metricName = metricName;
this.appId = appId;
+ this.instanceId = instanceId;
this.units = units;
this.type = type;
this.seriesStartTime = seriesStartTime;
@@ -82,6 +87,24 @@ public class TimelineMetricMetadata {
this.appId = appId;
}
+ @XmlElement(name = "instanceId")
+ public String getInstanceId() {
+ return instanceId;
+ }
+
+ public void setInstanceId(String instanceId) {
+ this.instanceId = instanceId;
+ }
+
+ @XmlElement(name = "uuid")
+ public byte[] getUuid() {
+ return uuid;
+ }
+
+ public void setUuid(byte[] uuid) {
+ this.uuid = uuid;
+ }
+
@XmlElement(name = "units")
public String getUnits() {
return units;
@@ -102,7 +125,7 @@ public class TimelineMetricMetadata {
@XmlElement(name = "seriesStartTime")
public Long getSeriesStartTime() {
- return seriesStartTime;
+ return (seriesStartTime != null) ? seriesStartTime : 0l;
}
public void setSeriesStartTime(Long seriesStartTime) {
@@ -138,9 +161,10 @@ public class TimelineMetricMetadata {
*/
public boolean needsToBeSynced(TimelineMetricMetadata metadata) throws MetadataException {
if (!this.metricName.equals(metadata.getMetricName()) ||
- !this.appId.equals(metadata.getAppId())) {
+ !this.appId.equals(metadata.getAppId()) ||
+ !(StringUtils.isNotEmpty(instanceId) ? instanceId.equals(metadata.instanceId) : StringUtils.isEmpty(metadata.instanceId))) {
throw new MetadataException("Unexpected argument: metricName = " +
- metadata.getMetricName() + ", appId = " + metadata.getAppId());
+ metadata.getMetricName() + ", appId = " + metadata.getAppId() + ", instanceId = " + metadata.getInstanceId());
}
// Series start time should never change
@@ -159,14 +183,15 @@ public class TimelineMetricMetadata {
TimelineMetricMetadata that = (TimelineMetricMetadata) o;
if (!metricName.equals(that.metricName)) return false;
- return !(appId != null ? !appId.equals(that.appId) : that.appId != null);
-
+ if (!appId.equals(that.appId)) return false;
+ return (StringUtils.isNotEmpty(instanceId) ? instanceId.equals(that.instanceId) : StringUtils.isEmpty(that.instanceId));
}
@Override
public int hashCode() {
int result = metricName.hashCode();
result = 31 * result + (appId != null ? appId.hashCode() : 0);
+ result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
return result;
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
index f962f44..66c46db 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import org.apache.ambari.metrics.alertservice.spark.AmsKafkaProducer;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
@@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricHostMetadata;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
@@ -82,7 +84,6 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time
private TimelineMetricMetadataManager metricMetadataManager;
private Integer defaultTopNHostsLimit;
private MetricCollectorHAController haController;
- private AmsKafkaProducer kafkaProducer;
/**
* Construct the service.
@@ -143,8 +144,6 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time
LOG.info("Using group by aggregators for aggregating host and cluster metrics.");
}
- kafkaProducer = new AmsKafkaProducer(metricsConf.get("kafka.bootstrap.servers")); //104.196.85.21:6667
-
// Start the cluster aggregator second
TimelineMetricAggregator secondClusterAggregator =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(
@@ -154,19 +153,19 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time
// Start the minute cluster aggregator
TimelineMetricAggregator minuteClusterAggregator =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(
- hBaseAccessor, metricsConf, haController);
+ hBaseAccessor, metricsConf, metricMetadataManager, haController);
scheduleAggregatorThread(minuteClusterAggregator);
// Start the hourly cluster aggregator
TimelineMetricAggregator hourlyClusterAggregator =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(
- hBaseAccessor, metricsConf, haController);
+ hBaseAccessor, metricsConf, metricMetadataManager, haController);
scheduleAggregatorThread(hourlyClusterAggregator);
// Start the daily cluster aggregator
TimelineMetricAggregator dailyClusterAggregator =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(
- hBaseAccessor, metricsConf, haController);
+ hBaseAccessor, metricsConf, metricMetadataManager, haController);
scheduleAggregatorThread(dailyClusterAggregator);
// Start the minute host aggregator
@@ -175,20 +174,20 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time
} else {
TimelineMetricAggregator minuteHostAggregator =
TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(
- hBaseAccessor, metricsConf, haController);
+ hBaseAccessor, metricsConf, metricMetadataManager, haController);
scheduleAggregatorThread(minuteHostAggregator);
}
// Start the hourly host aggregator
TimelineMetricAggregator hourlyHostAggregator =
TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(
- hBaseAccessor, metricsConf, haController);
+ hBaseAccessor, metricsConf, metricMetadataManager, haController);
scheduleAggregatorThread(hourlyHostAggregator);
// Start the daily host aggregator
TimelineMetricAggregator dailyHostAggregator =
TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(
- hBaseAccessor, metricsConf, haController);
+ hBaseAccessor, metricsConf, metricMetadataManager, haController);
scheduleAggregatorThread(dailyHostAggregator);
if (!configuration.isTimelineMetricsServiceWatcherDisabled()) {
@@ -238,6 +237,8 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time
Multimap<String, List<Function>> metricFunctions =
parseMetricNamesToAggregationFunctions(metricNames);
+ List<byte[]> uuids = metricMetadataManager.getUuids(metricFunctions.keySet(), hostnames, applicationId, instanceId);
+
ConditionBuilder conditionBuilder = new ConditionBuilder(new ArrayList<String>(metricFunctions.keySet()))
.hostnames(hostnames)
.appId(applicationId)
@@ -246,7 +247,8 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time
.endTime(endTime)
.precision(precision)
.limit(limit)
- .grouped(groupedByHosts);
+ .grouped(groupedByHosts)
+ .uuid(uuids);
if (topNConfig != null) {
if (TopNCondition.isTopNHostCondition(metricNames, hostnames) ^ //Only 1 condition should be true.
@@ -368,13 +370,6 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time
// Error indicated by the Sql exception
TimelinePutResponse response = new TimelinePutResponse();
- try {
- if (!metrics.getMetrics().isEmpty() && metrics.getMetrics().get(0).getAppId().equals("HOST")) {
- kafkaProducer.sendMetrics(fromTimelineMetrics(metrics));
- }
- } catch (InterruptedException | ExecutionException e) {
- LOG.error(e);
- }
hBaseAccessor.insertMetricRecordsWithMetadata(metricMetadataManager, metrics, false);
return response;
@@ -439,8 +434,18 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time
}
@Override
+ public Map<String, TimelineMetricMetadataKey> getUuids() throws SQLException, IOException {
+ return metricMetadataManager.getUuidKeyMap();
+ }
+
+ @Override
public Map<String, Set<String>> getHostAppsMetadata() throws SQLException, IOException {
- return metricMetadataManager.getHostedAppsCache();
+ Map<String, TimelineMetricHostMetadata> hostsMetadata = metricMetadataManager.getHostedAppsCache();
+ Map<String, Set<String>> hostAppMap = new HashMap<>();
+ for (String hostname : hostsMetadata.keySet()) {
+ hostAppMap.put(hostname, hostsMetadata.get(hostname).getHostedApps());
+ }
+ return hostAppMap;
}
@Override
@@ -459,7 +464,7 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time
public Map<String, Map<String,Set<String>>> getInstanceHostsMetadata(String instanceId, String appId)
throws SQLException, IOException {
- Map<String, Set<String>> hostedApps = metricMetadataManager.getHostedAppsCache();
+ Map<String, Set<String>> hostedApps = getHostAppsMetadata();
Map<String, Set<String>> instanceHosts = metricMetadataManager.getHostedInstanceCache();
Map<String, Map<String, Set<String>>> instanceAppHosts = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index 15b0bb8..7ad88a1 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -132,6 +132,7 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricHostMetadata;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
@@ -171,7 +172,7 @@ public class PhoenixHBaseAccessor {
private static final int POINTS_PER_MINUTE = 6;
public static int RESULTSET_LIMIT = (int)TimeUnit.HOURS.toMinutes(2) * METRICS_PER_MINUTE * POINTS_PER_MINUTE ;
- static final TimelineMetricReadHelper TIMELINE_METRIC_READ_HELPER = new TimelineMetricReadHelper();
+ static TimelineMetricReadHelper TIMELINE_METRIC_READ_HELPER = new TimelineMetricReadHelper();
static ObjectMapper mapper = new ObjectMapper();
static TypeReference<TreeMap<Long, Double>> metricValuesTypeRef = new TypeReference<TreeMap<Long, Double>>() {};
@@ -190,6 +191,7 @@ public class PhoenixHBaseAccessor {
private final boolean skipBlockCacheForAggregatorsEnabled;
private final String timelineMetricsTablesDurability;
private final String timelineMetricsPrecisionTableDurability;
+ private TimelineMetricMetadataManager metadataManagerInstance;
static final String HSTORE_COMPACTION_CLASS_KEY =
"hbase.hstore.defaultengine.compactionpolicy.class";
@@ -282,6 +284,7 @@ public class PhoenixHBaseAccessor {
}
rawMetricsSource = internalSourceProvider.getInternalMetricsSource(RAW_METRICS, interval, rawMetricsSink);
}
+ TIMELINE_METRIC_READ_HELPER = new TimelineMetricReadHelper(this.metadataManagerInstance);
}
public boolean isInsertCacheEmpty() {
@@ -336,19 +339,20 @@ public class PhoenixHBaseAccessor {
double[] aggregates = AggregatorUtils.calculateAggregates(
metric.getMetricValues());
- metricRecordStmt.setString(1, metric.getMetricName());
- metricRecordStmt.setString(2, metric.getHostName());
- metricRecordStmt.setString(3, metric.getAppId());
- metricRecordStmt.setString(4, metric.getInstanceId());
- metricRecordStmt.setLong(5, currentTime);
- metricRecordStmt.setLong(6, metric.getStartTime());
- metricRecordStmt.setString(7, metric.getUnits());
- metricRecordStmt.setDouble(8, aggregates[0]);
- metricRecordStmt.setDouble(9, aggregates[1]);
- metricRecordStmt.setDouble(10, aggregates[2]);
- metricRecordStmt.setLong(11, (long) aggregates[3]);
+ byte[] uuid = metadataManagerInstance.getUuid(metric);
+ if (uuid == null) {
+ LOG.error("Error computing UUID for metric. Cannot write metrics : " + metric.toString());
+ continue;
+ }
+ metricRecordStmt.setBytes(1, uuid);
+ metricRecordStmt.setLong(2, currentTime);
+ metricRecordStmt.setLong(3, metric.getStartTime());
+ metricRecordStmt.setDouble(4, aggregates[0]);
+ metricRecordStmt.setDouble(5, aggregates[1]);
+ metricRecordStmt.setDouble(6, aggregates[2]);
+ metricRecordStmt.setLong(7, (long) aggregates[3]);
String json = TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues());
- metricRecordStmt.setString(12, json);
+ metricRecordStmt.setString(8, json);
try {
metricRecordStmt.executeUpdate();
@@ -477,20 +481,12 @@ public class PhoenixHBaseAccessor {
// Host level
String precisionSql = String.format(CREATE_METRICS_TABLE_SQL,
encoding, tableTTL.get(METRICS_RECORD_TABLE_NAME), compression);
- String splitPoints = metricsConf.get(PRECISION_TABLE_SPLIT_POINTS);
- if (!StringUtils.isEmpty(splitPoints)) {
- precisionSql += getSplitPointsStr(splitPoints);
- }
stmt.executeUpdate(precisionSql);
String hostMinuteAggregrateSql = String.format(CREATE_METRICS_AGGREGATE_TABLE_SQL,
METRICS_AGGREGATE_MINUTE_TABLE_NAME, encoding,
tableTTL.get(METRICS_AGGREGATE_MINUTE_TABLE_NAME),
compression);
- splitPoints = metricsConf.get(AGGREGATE_TABLE_SPLIT_POINTS);
- if (!StringUtils.isEmpty(splitPoints)) {
- hostMinuteAggregrateSql += getSplitPointsStr(splitPoints);
- }
stmt.executeUpdate(hostMinuteAggregrateSql);
stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_TABLE_SQL,
@@ -507,10 +503,7 @@ public class PhoenixHBaseAccessor {
METRICS_CLUSTER_AGGREGATE_TABLE_NAME, encoding,
tableTTL.get(METRICS_CLUSTER_AGGREGATE_TABLE_NAME),
compression);
- splitPoints = metricsConf.get(AGGREGATE_TABLE_SPLIT_POINTS);
- if (!StringUtils.isEmpty(splitPoints)) {
- aggregateSql += getSplitPointsStr(splitPoints);
- }
+
stmt.executeUpdate(aggregateSql);
stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL,
METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME, encoding,
@@ -961,7 +954,8 @@ public class PhoenixHBaseAccessor {
private void appendMetricFromResultSet(TimelineMetrics metrics, Condition condition,
Multimap<String, List<Function>> metricFunctions,
ResultSet rs) throws SQLException, IOException {
- String metricName = rs.getString("METRIC_NAME");
+ byte[] uuid = rs.getBytes("UUID");
+ String metricName = metadataManagerInstance.getMetricNameFromUuid(uuid);
Collection<List<Function>> functionList = findMetricFunctions(metricFunctions, metricName);
for (List<Function> functions : functionList) {
@@ -1103,7 +1097,8 @@ public class PhoenixHBaseAccessor {
Condition condition, Multimap<String, List<Function>> metricFunctions,
ResultSet rs) throws SQLException {
- String metricName = rs.getString("METRIC_NAME");
+ byte[] uuid = rs.getBytes("UUID");
+ String metricName = metadataManagerInstance.getMetricNameFromUuid(uuid);
Collection<List<Function>> functionList = findMetricFunctions(metricFunctions, metricName);
for (List<Function> functions : functionList) {
@@ -1136,14 +1131,15 @@ public class PhoenixHBaseAccessor {
SplitByMetricNamesCondition splitCondition =
new SplitByMetricNamesCondition(condition);
- for (String metricName: splitCondition.getOriginalMetricNames()) {
+ for (byte[] uuid: condition.getUuids()) {
- splitCondition.setCurrentMetric(metricName);
+ splitCondition.setCurrentUuid(uuid);
stmt = PhoenixTransactSQL.prepareGetLatestAggregateMetricSqlStmt(conn, splitCondition);
ResultSet rs = null;
try {
rs = stmt.executeQuery();
while (rs.next()) {
+ String metricName = metadataManagerInstance.getMetricNameFromUuid(uuid);
Collection<List<Function>> functionList = findMetricFunctions(metricFunctions, metricName);
for (List<Function> functions : functionList) {
if (functions != null) {
@@ -1187,14 +1183,16 @@ public class PhoenixHBaseAccessor {
countColumnName = "HOSTS_COUNT";
}
+ byte[] uuid = rs.getBytes("UUID");
+ TimelineMetric timelineMetric = metadataManagerInstance.getMetricFromUuid(uuid);
+
SingleValuedTimelineMetric metric = new SingleValuedTimelineMetric(
- rs.getString("METRIC_NAME") + f.getSuffix(),
- rs.getString("APP_ID"),
- rs.getString("INSTANCE_ID"),
+ timelineMetric.getMetricName() + f.getSuffix(),
+ timelineMetric.getAppId(),
+ timelineMetric.getInstanceId(),
null,
rs.getLong("SERVER_TIME"),
- rs.getLong("SERVER_TIME"),
- rs.getString("UNITS")
+ rs.getLong("SERVER_TIME")
);
double value;
@@ -1277,18 +1275,19 @@ public class PhoenixHBaseAccessor {
TimelineMetric metric = metricAggregate.getKey();
MetricHostAggregate hostAggregate = metricAggregate.getValue();
+ byte[] uuid = metadataManagerInstance.getUuid(metric);
+ if (uuid == null) {
+ LOG.error("Error computing UUID for metric. Cannot write metric : " + metric.toString());
+ continue;
+ }
rowCount++;
stmt.clearParameters();
- stmt.setString(1, metric.getMetricName());
- stmt.setString(2, metric.getHostName());
- stmt.setString(3, metric.getAppId());
- stmt.setString(4, metric.getInstanceId());
- stmt.setLong(5, metric.getTimestamp());
- stmt.setString(6, metric.getType());
- stmt.setDouble(7, hostAggregate.getSum());
- stmt.setDouble(8, hostAggregate.getMax());
- stmt.setDouble(9, hostAggregate.getMin());
- stmt.setDouble(10, hostAggregate.getNumberOfSamples());
+ stmt.setBytes(1, uuid);
+ stmt.setLong(2, metric.getTimestamp());
+ stmt.setDouble(3, hostAggregate.getSum());
+ stmt.setDouble(4, hostAggregate.getMax());
+ stmt.setDouble(5, hostAggregate.getMin());
+ stmt.setDouble(6, hostAggregate.getNumberOfSamples());
try {
stmt.executeUpdate();
@@ -1372,16 +1371,18 @@ public class PhoenixHBaseAccessor {
}
rowCount++;
+ byte[] uuid = metadataManagerInstance.getUuid(clusterMetric);
+ if (uuid == null) {
+ LOG.error("Error computing UUID for metric. Cannot write metrics : " + clusterMetric.toString());
+ continue;
+ }
stmt.clearParameters();
- stmt.setString(1, clusterMetric.getMetricName());
- stmt.setString(2, clusterMetric.getAppId());
- stmt.setString(3, clusterMetric.getInstanceId());
- stmt.setLong(4, clusterMetric.getTimestamp());
- stmt.setString(5, clusterMetric.getType());
- stmt.setDouble(6, aggregate.getSum());
- stmt.setInt(7, aggregate.getNumberOfHosts());
- stmt.setDouble(8, aggregate.getMax());
- stmt.setDouble(9, aggregate.getMin());
+ stmt.setBytes(1, uuid);
+ stmt.setLong(2, clusterMetric.getTimestamp());
+ stmt.setDouble(3, aggregate.getSum());
+ stmt.setInt(4, aggregate.getNumberOfHosts());
+ stmt.setDouble(5, aggregate.getMax());
+ stmt.setDouble(6, aggregate.getMin());
try {
stmt.executeUpdate();
@@ -1458,17 +1459,20 @@ public class PhoenixHBaseAccessor {
"aggregate = " + aggregate);
}
+ byte[] uuid = metadataManagerInstance.getUuid(clusterMetric);
+ if (uuid == null) {
+ LOG.error("Error computing UUID for metric. Cannot write metric : " + clusterMetric.toString());
+ continue;
+ }
+
rowCount++;
stmt.clearParameters();
- stmt.setString(1, clusterMetric.getMetricName());
- stmt.setString(2, clusterMetric.getAppId());
- stmt.setString(3, clusterMetric.getInstanceId());
- stmt.setLong(4, clusterMetric.getTimestamp());
- stmt.setString(5, clusterMetric.getType());
- stmt.setDouble(6, aggregate.getSum());
- stmt.setLong(7, aggregate.getNumberOfSamples());
- stmt.setDouble(8, aggregate.getMax());
- stmt.setDouble(9, aggregate.getMin());
+ stmt.setBytes(1, uuid);
+ stmt.setLong(2, clusterMetric.getTimestamp());
+ stmt.setDouble(3, aggregate.getSum());
+ stmt.setLong(4, aggregate.getNumberOfSamples());
+ stmt.setDouble(5, aggregate.getMax());
+ stmt.setDouble(6, aggregate.getMin());
try {
stmt.executeUpdate();
@@ -1556,21 +1560,23 @@ public class PhoenixHBaseAccessor {
* One time save of metadata when discovering topology during aggregation.
* @throws SQLException
*/
- public void saveHostAppsMetadata(Map<String, Set<String>> hostedApps) throws SQLException {
+ public void saveHostAppsMetadata(Map<String, TimelineMetricHostMetadata> hostMetadata) throws SQLException {
Connection conn = getConnection();
PreparedStatement stmt = null;
try {
stmt = conn.prepareStatement(UPSERT_HOSTED_APPS_METADATA_SQL);
int rowCount = 0;
- for (Map.Entry<String, Set<String>> hostedAppsEntry : hostedApps.entrySet()) {
+ for (Map.Entry<String, TimelineMetricHostMetadata> hostedAppsEntry : hostMetadata.entrySet()) {
+ TimelineMetricHostMetadata timelineMetricHostMetadata = hostedAppsEntry.getValue();
if (LOG.isTraceEnabled()) {
LOG.trace("HostedAppsMetadata: " + hostedAppsEntry);
}
stmt.clearParameters();
stmt.setString(1, hostedAppsEntry.getKey());
- stmt.setString(2, StringUtils.join(hostedAppsEntry.getValue(), ","));
+ stmt.setBytes(2, timelineMetricHostMetadata.getUuid());
+ stmt.setString(3, StringUtils.join(timelineMetricHostMetadata.getHostedApps(), ","));
try {
stmt.executeUpdate();
rowCount++;
@@ -1674,15 +1680,21 @@ public class PhoenixHBaseAccessor {
+ ", seriesStartTime = " + metadata.getSeriesStartTime()
);
}
-
- stmt.clearParameters();
- stmt.setString(1, metadata.getMetricName());
- stmt.setString(2, metadata.getAppId());
- stmt.setString(3, metadata.getUnits());
- stmt.setString(4, metadata.getType());
- stmt.setLong(5, metadata.getSeriesStartTime());
- stmt.setBoolean(6, metadata.isSupportsAggregates());
- stmt.setBoolean(7, metadata.isWhitelisted());
+ try {
+ stmt.clearParameters();
+ stmt.setString(1, metadata.getMetricName());
+ stmt.setString(2, metadata.getAppId());
+ stmt.setString(3, metadata.getInstanceId());
+ stmt.setBytes(4, metadata.getUuid());
+ stmt.setString(5, metadata.getUnits());
+ stmt.setString(6, metadata.getType());
+ stmt.setLong(7, metadata.getSeriesStartTime());
+ stmt.setBoolean(8, metadata.isSupportsAggregates());
+ stmt.setBoolean(9, metadata.isWhitelisted());
+ } catch (Exception e) {
+ LOG.error("Exception in saving metric metadata entry. ");
+ continue;
+ }
try {
stmt.executeUpdate();
@@ -1713,8 +1725,8 @@ public class PhoenixHBaseAccessor {
}
}
- public Map<String, Set<String>> getHostedAppsMetadata() throws SQLException {
- Map<String, Set<String>> hostedAppMap = new HashMap<>();
+ public Map<String, TimelineMetricHostMetadata> getHostedAppsMetadata() throws SQLException {
+ Map<String, TimelineMetricHostMetadata> hostedAppMap = new HashMap<>();
Connection conn = getConnection();
PreparedStatement stmt = null;
ResultSet rs = null;
@@ -1724,8 +1736,9 @@ public class PhoenixHBaseAccessor {
rs = stmt.executeQuery();
while (rs.next()) {
- hostedAppMap.put(rs.getString("HOSTNAME"),
- new HashSet<>(Arrays.asList(StringUtils.split(rs.getString("APP_IDS"), ","))));
+ TimelineMetricHostMetadata hostMetadata = new TimelineMetricHostMetadata(new HashSet<>(Arrays.asList(StringUtils.split(rs.getString("APP_IDS"), ","))));
+ hostMetadata.setUuid(rs.getBytes("UUID"));
+ hostedAppMap.put(rs.getString("HOSTNAME"), hostMetadata);
}
} finally {
@@ -1816,9 +1829,11 @@ public class PhoenixHBaseAccessor {
while (rs.next()) {
String metricName = rs.getString("METRIC_NAME");
String appId = rs.getString("APP_ID");
+ String instanceId = rs.getString("INSTANCE_ID");
TimelineMetricMetadata metadata = new TimelineMetricMetadata(
metricName,
appId,
+ instanceId,
rs.getString("UNITS"),
rs.getString("TYPE"),
rs.getLong("START_TIME"),
@@ -1826,8 +1841,9 @@ public class PhoenixHBaseAccessor {
rs.getBoolean("IS_WHITELISTED")
);
- TimelineMetricMetadataKey key = new TimelineMetricMetadataKey(metricName, appId);
+ TimelineMetricMetadataKey key = new TimelineMetricMetadataKey(metricName, appId, instanceId);
metadata.setIsPersisted(true); // Always true on retrieval
+ metadata.setUuid(rs.getBytes("UUID"));
metadataMap.put(key, metadata);
}
@@ -1858,4 +1874,8 @@ public class PhoenixHBaseAccessor {
return metadataMap;
}
+ public void setMetadataInstance(TimelineMetricMetadataManager metadataManager) {
+ this.metadataManagerInstance = metadataManager;
+ TIMELINE_METRIC_READ_HELPER = new TimelineMetricReadHelper(this.metadataManagerInstance);
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index de33bd1..2060867 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -304,6 +304,9 @@ public class TimelineMetricConfiguration {
public static final String TIMELINE_METRICS_PRECISION_TABLE_HBASE_BLOCKING_STORE_FILES =
"timeline.metrics.precision.table.hbase.hstore.blockingStoreFiles";
+ public static final String TIMELINE_METRICS_UUID_GEN_STRATEGY =
+ "timeline.metrics.uuid.gen.strategy";
+
public static final String HOST_APP_ID = "HOST";
public static final String DEFAULT_INSTANCE_PORT = "12001";
http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
index d052d54..dab4494 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
@@ -25,6 +25,8 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
+
import java.io.IOException;
import java.sql.SQLException;
import java.util.List;
@@ -98,9 +100,11 @@ public interface TimelineMetricStore {
*/
Map<String, Map<String,Set<String>>> getInstanceHostsMetadata(String instanceId, String appId) throws SQLException, IOException;
- /**
- * Return a list of known live collector nodes
- * @return [ hostname ]
- */
+ Map<String, TimelineMetricMetadataKey> getUuids() throws SQLException, IOException;
+
+ /**
+ * Return a list of known live collector nodes
+ * @return [ hostname ]
+ */
List<String> getLiveInstances();
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsFilter.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsFilter.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsFilter.java
index 1446ec2..04cf422 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsFilter.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsFilter.java
@@ -167,11 +167,4 @@ public class TimelineMetricsFilter {
return false;
}
- public static void addToWhitelist(String metricName) {
-
- if (StringUtils.isNotEmpty(metricName)) {
- whitelistedMetrics.add(metricName);
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
index cb131d3..83f2392 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
@@ -272,7 +272,8 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
conn.commit();
LOG.info(rows + " row(s) updated in aggregation.");
- downsample(conn, startTime, endTime);
+ //TODO : Fix downsampling after UUID change.
+ //downsample(conn, startTime, endTime);
} else {
rs = stmt.executeQuery();
}
@@ -280,7 +281,7 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
aggregate(rs, startTime, endTime);
- } catch (SQLException | IOException e) {
+ } catch (Exception e) {
LOG.error("Exception during aggregating metrics.", e);
success = false;
} finally {
@@ -455,25 +456,29 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
* @return
*/
protected String getDownsampledMetricSkipClause() {
- if (CollectionUtils.isEmpty(this.downsampleMetricPatterns)) {
- return StringUtils.EMPTY;
- }
-
- StringBuilder sb = new StringBuilder();
-
- for (int i = 0; i < downsampleMetricPatterns.size(); i++) {
- sb.append(" METRIC_NAME");
- sb.append(" NOT");
- sb.append(" LIKE ");
- sb.append("'" + downsampleMetricPatterns.get(i) + "'");
- if (i < downsampleMetricPatterns.size() - 1) {
- sb.append(" AND ");
- }
- }
-
- sb.append(" AND ");
- return sb.toString();
+ //TODO Fix downsampling for UUID change.
+ return StringUtils.EMPTY;
+
+// if (CollectionUtils.isEmpty(this.downsampleMetricPatterns)) {
+// return StringUtils.EMPTY;
+// }
+//
+// StringBuilder sb = new StringBuilder();
+//
+// for (int i = 0; i < downsampleMetricPatterns.size(); i++) {
+// sb.append(" METRIC_NAME");
+// sb.append(" NOT");
+// sb.append(" LIKE ");
+// sb.append("'" + downsampleMetricPatterns.get(i) + "'");
+//
+// if (i < downsampleMetricPatterns.size() - 1) {
+// sb.append(" AND ");
+// }
+// }
+//
+// sb.append(" AND ");
+// return sb.toString();
}
/**
http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java
index b7d9110..6e793e1 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java
@@ -22,15 +22,13 @@ public class TimelineClusterMetric {
private String appId;
private String instanceId;
private long timestamp;
- private String type;
public TimelineClusterMetric(String metricName, String appId, String instanceId,
- long timestamp, String type) {
+ long timestamp) {
this.metricName = metricName;
this.appId = appId;
this.instanceId = instanceId;
this.timestamp = timestamp;
- this.type = type;
}
public String getMetricName() {
@@ -49,8 +47,6 @@ public class TimelineClusterMetric {
return timestamp;
}
- public String getType() { return type; }
-
@Override
public boolean equals(Object o) {
if (this == o) return true;
http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
index 2eb3553..081e610 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
@@ -95,6 +95,7 @@ public class TimelineMetricAggregatorFactory {
*/
public static TimelineMetricAggregator createTimelineMetricAggregatorMinute
(PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+ TimelineMetricMetadataManager metadataManager,
MetricCollectorHAController haController) {
String checkpointDir = metricsConf.get(
@@ -128,6 +129,7 @@ public class TimelineMetricAggregatorFactory {
return new TimelineMetricHostAggregator(
METRIC_RECORD_MINUTE,
+ metadataManager,
hBaseAccessor, metricsConf,
checkpointLocation,
sleepIntervalMillis,
@@ -145,6 +147,7 @@ public class TimelineMetricAggregatorFactory {
*/
public static TimelineMetricAggregator createTimelineMetricAggregatorHourly
(PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+ TimelineMetricMetadataManager metadataManager,
MetricCollectorHAController haController) {
String checkpointDir = metricsConf.get(
@@ -178,6 +181,7 @@ public class TimelineMetricAggregatorFactory {
return new TimelineMetricHostAggregator(
METRIC_RECORD_HOURLY,
+ metadataManager,
hBaseAccessor, metricsConf,
checkpointLocation,
sleepIntervalMillis,
@@ -195,6 +199,7 @@ public class TimelineMetricAggregatorFactory {
*/
public static TimelineMetricAggregator createTimelineMetricAggregatorDaily
(PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+ TimelineMetricMetadataManager metadataManager,
MetricCollectorHAController haController) {
String checkpointDir = metricsConf.get(
@@ -228,6 +233,7 @@ public class TimelineMetricAggregatorFactory {
return new TimelineMetricHostAggregator(
METRIC_RECORD_DAILY,
+ metadataManager,
hBaseAccessor, metricsConf,
checkpointLocation,
sleepIntervalMillis,
@@ -291,6 +297,7 @@ public class TimelineMetricAggregatorFactory {
*/
public static TimelineMetricAggregator createTimelineClusterAggregatorMinute(
PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+ TimelineMetricMetadataManager metadataManager,
MetricCollectorHAController haController) {
String checkpointDir = metricsConf.get(
@@ -326,6 +333,7 @@ public class TimelineMetricAggregatorFactory {
return new TimelineMetricClusterAggregator(
METRIC_AGGREGATE_MINUTE,
+ metadataManager,
hBaseAccessor, metricsConf,
checkpointLocation,
sleepIntervalMillis,
@@ -344,6 +352,7 @@ public class TimelineMetricAggregatorFactory {
*/
public static TimelineMetricAggregator createTimelineClusterAggregatorHourly(
PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+ TimelineMetricMetadataManager metadataManager,
MetricCollectorHAController haController) {
String checkpointDir = metricsConf.get(
@@ -379,6 +388,7 @@ public class TimelineMetricAggregatorFactory {
return new TimelineMetricClusterAggregator(
METRIC_AGGREGATE_HOURLY,
+ metadataManager,
hBaseAccessor, metricsConf,
checkpointLocation,
sleepIntervalMillis,
@@ -397,6 +407,7 @@ public class TimelineMetricAggregatorFactory {
*/
public static TimelineMetricAggregator createTimelineClusterAggregatorDaily(
PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+ TimelineMetricMetadataManager metadataManager,
MetricCollectorHAController haController) {
String checkpointDir = metricsConf.get(
@@ -432,6 +443,7 @@ public class TimelineMetricAggregatorFactory {
return new TimelineMetricClusterAggregator(
METRIC_AGGREGATE_DAILY,
+ metadataManager,
hBaseAccessor, metricsConf,
checkpointLocation,
sleepIntervalMillis,
http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
index 9eaf456..55104de 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricsFilter;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricHostMetadata;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
@@ -48,14 +49,14 @@ public class TimelineMetricAppAggregator {
private static final Log LOG = LogFactory.getLog(TimelineMetricAppAggregator.class);
// Lookup to check candidacy of an app
private final List<String> appIdsToAggregate;
- private final Map<String, Set<String>> hostedAppsMap;
+ private final Map<String, TimelineMetricHostMetadata> hostMetadata;
Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics = new HashMap<>();
TimelineMetricMetadataManager metadataManagerInstance;
public TimelineMetricAppAggregator(TimelineMetricMetadataManager metadataManager,
Configuration metricsConf) {
appIdsToAggregate = getAppIdsForHostAggregation(metricsConf);
- hostedAppsMap = metadataManager.getHostedAppsCache();
+ hostMetadata = metadataManager.getHostedAppsCache();
metadataManagerInstance = metadataManager;
LOG.info("AppIds configured for aggregation: " + appIdsToAggregate);
}
@@ -95,17 +96,20 @@ public class TimelineMetricAppAggregator {
// If metric is a host metric and host has apps on it
if (appId.equalsIgnoreCase(HOST_APP_ID)) {
// Candidate metric, update app aggregates
- if (hostedAppsMap.containsKey(hostname)) {
+ if (hostMetadata.containsKey(hostname)) {
updateAppAggregatesFromHostMetric(clusterMetric, hostname, metricValue);
}
} else {
// Build the hostedapps map if not a host metric
// Check app candidacy for host aggregation
if (appIdsToAggregate.contains(appId)) {
- Set<String> appIds = hostedAppsMap.get(hostname);
- if (appIds == null) {
+ TimelineMetricHostMetadata timelineMetricHostMetadata = hostMetadata.get(hostname);
+ Set<String> appIds;
+ if (timelineMetricHostMetadata == null) {
appIds = new HashSet<>();
- hostedAppsMap.put(hostname, appIds);
+ hostMetadata.put(hostname, new TimelineMetricHostMetadata(appIds));
+ } else {
+ appIds = timelineMetricHostMetadata.getHostedApps();
}
if (!appIds.contains(appId)) {
appIds.add(appId);
@@ -127,20 +131,20 @@ public class TimelineMetricAppAggregator {
return;
}
- TimelineMetricMetadataKey appKey = new TimelineMetricMetadataKey(clusterMetric.getMetricName(), HOST_APP_ID);
- Set<String> apps = hostedAppsMap.get(hostname);
+ TimelineMetricMetadataKey appKey = new TimelineMetricMetadataKey(clusterMetric.getMetricName(), HOST_APP_ID, clusterMetric.getInstanceId());
+ Set<String> apps = hostMetadata.get(hostname).getHostedApps();
for (String appId : apps) {
if (appIdsToAggregate.contains(appId)) {
appKey.setAppId(appId);
TimelineMetricMetadata appMetadata = metadataManagerInstance.getMetadataCacheValue(appKey);
if (appMetadata == null) {
- TimelineMetricMetadataKey key = new TimelineMetricMetadataKey(clusterMetric.getMetricName(), HOST_APP_ID);
+ TimelineMetricMetadataKey key = new TimelineMetricMetadataKey(clusterMetric.getMetricName(), HOST_APP_ID, clusterMetric.getInstanceId());
TimelineMetricMetadata hostMetricMetadata = metadataManagerInstance.getMetadataCacheValue(key);
if (hostMetricMetadata != null) {
TimelineMetricMetadata timelineMetricMetadata = new TimelineMetricMetadata(clusterMetric.getMetricName(),
- appId, hostMetricMetadata.getUnits(), hostMetricMetadata.getType(), hostMetricMetadata.getSeriesStartTime(),
+ appId, clusterMetric.getInstanceId(), hostMetricMetadata.getUnits(), hostMetricMetadata.getType(), hostMetricMetadata.getSeriesStartTime(),
hostMetricMetadata.isSupportsAggregates(), TimelineMetricsFilter.acceptMetric(clusterMetric.getMetricName(), appId));
metadataManagerInstance.putIfModifiedTimelineMetricMetadata(timelineMetricMetadata);
}
@@ -151,9 +155,7 @@ public class TimelineMetricAppAggregator {
new TimelineClusterMetric(clusterMetric.getMetricName(),
appId,
clusterMetric.getInstanceId(),
- clusterMetric.getTimestamp(),
- clusterMetric.getType()
- );
+ clusterMetric.getTimestamp());
MetricClusterAggregate clusterAggregate = aggregateClusterMetrics.get(appTimelineClusterMetric);
http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
index 74d4013..0f6dd79 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
@@ -37,10 +38,11 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator {
- private final TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(true);
+ private final TimelineMetricReadHelper readHelper;
private final boolean isClusterPrecisionInputTable;
public TimelineMetricClusterAggregator(AGGREGATOR_NAME aggregatorName,
+ TimelineMetricMetadataManager metricMetadataManager,
PhoenixHBaseAccessor hBaseAccessor,
Configuration metricsConf,
String checkpointLocation,
@@ -56,6 +58,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
hostAggregatorDisabledParam, inputTableName, outputTableName,
nativeTimeRangeDelay, haController);
isClusterPrecisionInputTable = inputTableName.equals(METRICS_CLUSTER_AGGREGATE_TABLE_NAME);
+ readHelper = new TimelineMetricReadHelper(metricMetadataManager, true);
}
@Override
@@ -71,9 +74,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
}
condition.setStatement(sqlStr);
- condition.addOrderByColumn("METRIC_NAME");
- condition.addOrderByColumn("APP_ID");
- condition.addOrderByColumn("INSTANCE_ID");
+ condition.addOrderByColumn("UUID");
condition.addOrderByColumn("SERVER_TIME");
return condition;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
index 34b1f9b..cae7263 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
@@ -56,7 +56,7 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
*/
public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggregator {
public Long timeSliceIntervalMillis;
- private TimelineMetricReadHelper timelineMetricReadHelper = new TimelineMetricReadHelper(true);
+ private TimelineMetricReadHelper timelineMetricReadHelper;
// Aggregator to perform app-level aggregates for host metrics
private final TimelineMetricAppAggregator appAggregator;
// 1 minute client side buffering adjustment
@@ -64,6 +64,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
private final boolean interpolationEnabled;
private TimelineMetricMetadataManager metadataManagerInstance;
private String skipAggrPatternStrings;
+ private final static String liveHostsMetricName = "live_hosts";
public TimelineMetricClusterAggregatorSecond(AGGREGATOR_NAME aggregatorName,
TimelineMetricMetadataManager metadataManager,
@@ -88,6 +89,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
this.serverTimeShiftAdjustment = Long.parseLong(metricsConf.get(SERVER_SIDE_TIMESIFT_ADJUSTMENT, "90000"));
this.interpolationEnabled = Boolean.parseBoolean(metricsConf.get(TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED, "true"));
this.skipAggrPatternStrings = metricsConf.get(TIMELINE_METRIC_AGGREGATION_SQL_FILTERS);
+ this.timelineMetricReadHelper = new TimelineMetricReadHelper(metadataManager, true);
}
@Override
@@ -127,10 +129,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
condition.setStatement(String.format(GET_METRIC_SQL,
getQueryHint(startTime), METRICS_RECORD_TABLE_NAME));
// Retaining order of the row-key avoids client side merge sort.
- condition.addOrderByColumn("METRIC_NAME");
- condition.addOrderByColumn("HOSTNAME");
- condition.addOrderByColumn("APP_ID");
- condition.addOrderByColumn("INSTANCE_ID");
+ condition.addOrderByColumn("UUID");
condition.addOrderByColumn("SERVER_TIME");
return condition;
}
@@ -212,7 +211,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
protected int processAggregateClusterMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics,
TimelineMetric metric, List<Long[]> timeSlices) {
// Create time slices
- TimelineMetricMetadataKey appKey = new TimelineMetricMetadataKey(metric.getMetricName(), metric.getAppId());
+ TimelineMetricMetadataKey appKey = new TimelineMetricMetadataKey(metric.getMetricName(), metric.getAppId(), metric.getInstanceId());
TimelineMetricMetadata metricMetadata = metadataManagerInstance.getMetadataCacheValue(appKey);
if (metricMetadata != null && !metricMetadata.isSupportsAggregates()) {
@@ -285,8 +284,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
timelineMetric.getMetricName(),
timelineMetric.getAppId(),
timelineMetric.getInstanceId(),
- timestamp,
- timelineMetric.getType());
+ timestamp);
if (prevTimestamp < 0 || timestamp.equals(prevTimestamp)) {
Double newValue = metric.getValue();
@@ -346,8 +344,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
timelineMetric.getMetricName(),
timelineMetric.getAppId(),
timelineMetric.getInstanceId(),
- entry.getKey(),
- timelineMetric.getType());
+ entry.getKey());
timelineClusterMetricMap.put(clusterMetric, interpolatedValue);
} else {
@@ -404,8 +401,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
timelineMetric.getMetricName(),
timelineMetric.getAppId(),
timelineMetric.getInstanceId(),
- timeSlice[1],
- timelineMetric.getType());
+ timeSlice[1]);
LOG.debug("Interpolated value : " + interpolatedValue);
timelineClusterMetricMap.put(clusterMetric, interpolatedValue);
@@ -435,13 +431,15 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
for (Map.Entry<String, MutableInt> appHostsEntry : appHostsCount.entrySet()) {
TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(
- "live_hosts", appHostsEntry.getKey(), null, timestamp, null);
+ liveHostsMetricName, appHostsEntry.getKey(), null, timestamp);
Integer numOfHosts = appHostsEntry.getValue().intValue();
MetricClusterAggregate metricClusterAggregate = new MetricClusterAggregate(
(double) numOfHosts, 1, null, (double) numOfHosts, (double) numOfHosts);
+ metadataManagerInstance.getUuid(timelineClusterMetric);
+
aggregateClusterMetrics.put(timelineClusterMetric, metricClusterAggregate);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
index a17433b..8f941e1 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
@@ -38,9 +39,10 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
private static final Log LOG = LogFactory.getLog(TimelineMetricHostAggregator.class);
- TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
+ TimelineMetricReadHelper readHelper;
public TimelineMetricHostAggregator(AGGREGATOR_NAME aggregatorName,
+ TimelineMetricMetadataManager metricMetadataManager,
PhoenixHBaseAccessor hBaseAccessor,
Configuration metricsConf,
String checkpointLocation,
@@ -54,6 +56,7 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
sleepIntervalMillis, checkpointCutOffMultiplier, hostAggregatorDisabledParam,
tableName, outputTableName, nativeTimeRangeDelay, haController);
+ readHelper = new TimelineMetricReadHelper(metricMetadataManager, false);
}
@Override
@@ -74,11 +77,8 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
getQueryHint(startTime), tableName));
// Retaining order of the row-key avoids client side merge sort.
- condition.addOrderByColumn("METRIC_NAME");
- condition.addOrderByColumn("HOSTNAME");
+ condition.addOrderByColumn("UUID");
condition.addOrderByColumn("SERVER_TIME");
- condition.addOrderByColumn("APP_ID");
- condition.addOrderByColumn("INSTANCE_ID");
return condition;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
index 672f85f..c8b5728 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
@@ -23,16 +23,17 @@ import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.util.Map;
import java.util.TreeMap;
public class TimelineMetricReadHelper {
private boolean ignoreInstance = false;
+ private TimelineMetricMetadataManager metadataManagerInstance = null;
public TimelineMetricReadHelper() {}
@@ -40,6 +41,15 @@ public class TimelineMetricReadHelper {
this.ignoreInstance = ignoreInstance;
}
+ public TimelineMetricReadHelper(TimelineMetricMetadataManager timelineMetricMetadataManager) {
+ this.metadataManagerInstance = timelineMetricMetadataManager;
+ }
+
+ public TimelineMetricReadHelper(TimelineMetricMetadataManager timelineMetricMetadataManager, boolean ignoreInstance) {
+ this.metadataManagerInstance = timelineMetricMetadataManager;
+ this.ignoreInstance = ignoreInstance;
+ }
+
public TimelineMetric getTimelineMetricFromResultSet(ResultSet rs)
throws SQLException, IOException {
TimelineMetric metric = getTimelineMetricCommonsFromResultSet(rs);
@@ -51,15 +61,16 @@ public class TimelineMetricReadHelper {
public SingleValuedTimelineMetric getAggregatedTimelineMetricFromResultSet(ResultSet rs,
Function f) throws SQLException, IOException {
+ byte[] uuid = rs.getBytes("UUID");
+ TimelineMetric timelineMetric = metadataManagerInstance.getMetricFromUuid(uuid);
Function function = (f != null) ? f : Function.DEFAULT_VALUE_FUNCTION;
SingleValuedTimelineMetric metric = new SingleValuedTimelineMetric(
- rs.getString("METRIC_NAME") + function.getSuffix(),
- rs.getString("APP_ID"),
- rs.getString("INSTANCE_ID"),
- rs.getString("HOSTNAME"),
- rs.getLong("SERVER_TIME"),
+ timelineMetric.getMetricName() + function.getSuffix(),
+ timelineMetric.getAppId(),
+ timelineMetric.getInstanceId(),
+ timelineMetric.getHostName(),
rs.getLong("SERVER_TIME"),
- rs.getString("UNITS")
+ rs.getLong("SERVER_TIME")
);
double value;
@@ -91,16 +102,14 @@ public class TimelineMetricReadHelper {
*/
public TimelineMetric getTimelineMetricCommonsFromResultSet(ResultSet rs)
throws SQLException {
- TimelineMetric metric = new TimelineMetric();
- metric.setMetricName(rs.getString("METRIC_NAME"));
- metric.setAppId(rs.getString("APP_ID"));
- if (!ignoreInstance) {
- metric.setInstanceId(rs.getString("INSTANCE_ID"));
+
+ byte[] uuid = rs.getBytes("UUID");
+ TimelineMetric metric = metadataManagerInstance.getMetricFromUuid(uuid);
+ if (ignoreInstance) {
+ metric.setInstanceId(null);
}
- metric.setHostName(rs.getString("HOSTNAME"));
metric.setTimestamp(rs.getLong("SERVER_TIME"));
metric.setStartTime(rs.getLong("START_TIME"));
- metric.setType(rs.getString("UNITS"));
return metric;
}
@@ -130,14 +139,16 @@ public class TimelineMetricReadHelper {
return agg;
}
-
public TimelineClusterMetric fromResultSet(ResultSet rs) throws SQLException {
+
+ byte[] uuid = rs.getBytes("UUID");
+ TimelineMetric timelineMetric = metadataManagerInstance.getMetricFromUuid(uuid);
+
return new TimelineClusterMetric(
- rs.getString("METRIC_NAME"),
- rs.getString("APP_ID"),
- ignoreInstance ? null : rs.getString("INSTANCE_ID"),
- rs.getLong("SERVER_TIME"),
- rs.getString("UNITS"));
+ timelineMetric.getMetricName(),
+ timelineMetric.getAppId(),
+ ignoreInstance ? null : timelineMetric.getInstanceId(),
+ rs.getLong("SERVER_TIME"));
}
public MetricHostAggregate getMetricHostAggregateFromResultSet(ResultSet rs)
@@ -154,14 +165,8 @@ public class TimelineMetricReadHelper {
public TimelineMetric getTimelineMetricKeyFromResultSet(ResultSet rs)
throws SQLException, IOException {
- TimelineMetric metric = new TimelineMetric();
- metric.setMetricName(rs.getString("METRIC_NAME"));
- metric.setAppId(rs.getString("APP_ID"));
- metric.setInstanceId(rs.getString("INSTANCE_ID"));
- metric.setHostName(rs.getString("HOSTNAME"));
- metric.setTimestamp(rs.getLong("SERVER_TIME"));
- metric.setType(rs.getString("UNITS"));
- return metric;
+ byte[] uuid = rs.getBytes("UUID");
+ return metadataManagerInstance.getMetricFromUuid(uuid);
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricHostMetadata.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricHostMetadata.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricHostMetadata.java
new file mode 100644
index 0000000..06e9279
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricHostMetadata.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class TimelineMetricHostMetadata {
+ private Set<String> hostedApps = new HashSet<>();
+ private byte[] uuid;
+
+ // Default constructor
+ public TimelineMetricHostMetadata() {
+ }
+
+ public TimelineMetricHostMetadata(Set<String> hostedApps) {
+ this.hostedApps = hostedApps;
+ }
+
+ public Set<String> getHostedApps() {
+ return hostedApps;
+ }
+
+ public void setHostedApps(Set<String> hostedApps) {
+ this.hostedApps = hostedApps;
+ }
+
+ public byte[] getUuid() {
+ return uuid;
+ }
+
+ public void setUuid(byte[] uuid) {
+ this.uuid = uuid;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataKey.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataKey.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataKey.java
index 504b502..6aeb2dd 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataKey.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataKey.java
@@ -17,13 +17,20 @@
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery;
+import org.apache.commons.lang3.StringUtils;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement
public class TimelineMetricMetadataKey {
String metricName;
String appId;
+ String instanceId;
- public TimelineMetricMetadataKey(String metricName, String appId) {
+ public TimelineMetricMetadataKey(String metricName, String appId, String instanceId) {
this.metricName = metricName;
this.appId = appId;
+ this.instanceId = instanceId;
}
public String getMetricName() {
@@ -34,6 +41,10 @@ public class TimelineMetricMetadataKey {
return appId;
}
+ public String getInstanceId() {
+ return instanceId;
+ }
+
public void setAppId(String appId) {
this.appId = appId;
}
@@ -46,15 +57,24 @@ public class TimelineMetricMetadataKey {
TimelineMetricMetadataKey that = (TimelineMetricMetadataKey) o;
if (!metricName.equals(that.metricName)) return false;
- return !(appId != null ? !appId.equals(that.appId) : that.appId != null);
-
+ if (!appId.equals(that.appId)) return false;
+ return (StringUtils.isNotEmpty(instanceId) ? instanceId.equals(that.instanceId) : StringUtils.isEmpty(that.instanceId));
}
@Override
public int hashCode() {
int result = metricName.hashCode();
result = 31 * result + (appId != null ? appId.hashCode() : 0);
+ result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
return result;
}
+ @Override
+ public String toString() {
+ return "TimelineMetricMetadataKey{" +
+ "metricName='" + metricName + '\'' +
+ ", appId='" + appId + '\'' +
+ ", instanceId='" + instanceId + '\'' +
+ '}';
+ }
}