You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jl...@apache.org on 2017/05/17 20:51:54 UTC
[20/25] ambari git commit: AMBARI-20758 Aggregate local metrics for
minute aggregation time window (dsen)
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py
index c0feed5..e5da9ba 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py
@@ -27,6 +27,9 @@ from event_definition import HostMetricCollectEvent, ProcessMetricCollectEvent
from metric_collector import MetricsCollector
from emitter import Emitter
from host_info import HostInfo
+from aggregator import Aggregator
+from aggregator import AggregatorWatchdog
+
logger = logging.getLogger()
@@ -50,11 +53,15 @@ class Controller(threading.Thread):
self.initialize_events_cache()
self.emitter = Emitter(self.config, self.application_metric_map, stop_handler)
self._t = None
+ self.aggregator = None
+ self.aggregator_watchdog = None
def run(self):
logger.info('Running Controller thread: %s' % threading.currentThread().getName())
self.start_emitter()
+ if self.config.is_inmemory_aggregation_enabled():
+ self.start_aggregator_with_watchdog()
# Wake every 5 seconds to push events to the queue
while True:
@@ -62,6 +69,10 @@ class Controller(threading.Thread):
logger.warn('Event Queue full!! Suspending further collections.')
else:
self.enqueque_events()
+ # restart aggregator if needed
+ if self.config.is_inmemory_aggregation_enabled() and not self.aggregator_watchdog.is_ok():
+ logger.warning("Aggregator is not available. Restarting aggregator.")
+ self.start_aggregator_with_watchdog()
pass
# Wait for the service stop event instead of sleeping blindly
if 0 == self._stop_handler.wait(self.sleep_interval):
@@ -75,6 +86,12 @@ class Controller(threading.Thread):
# The emitter thread should have stopped by now, just ensure it has shut
# down properly
self.emitter.join(5)
+
+ if self.config.is_inmemory_aggregation_enabled():
+ self.aggregator.stop()
+ self.aggregator_watchdog.stop()
+ self.aggregator.join(5)
+ self.aggregator_watchdog.join(5)
pass
# TODO: Optimize to not use Timer class and use the Queue instead
@@ -115,3 +132,14 @@ class Controller(threading.Thread):
def start_emitter(self):
self.emitter.start()
+
+ # Start aggregator and watcher threads
+ def start_aggregator_with_watchdog(self):
+ if self.aggregator:
+ self.aggregator.stop()
+ if self.aggregator_watchdog:
+ self.aggregator.stop()
+ self.aggregator = Aggregator(self.config, self._stop_handler)
+ self.aggregator_watchdog = AggregatorWatchdog(self.config, self._stop_handler)
+ self.aggregator.start()
+ self.aggregator_watchdog.start()
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
index e2a7f0d..77b8c23 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
@@ -44,10 +44,16 @@ class Emitter(threading.Thread):
self._stop_handler = stop_handler
self.application_metric_map = application_metric_map
self.collector_port = config.get_server_port()
- self.all_metrics_collector_hosts = config.get_metrics_collector_hosts()
+ self.all_metrics_collector_hosts = config.get_metrics_collector_hosts_as_list()
self.is_server_https_enabled = config.is_server_https_enabled()
self.set_instanceid = config.is_set_instanceid()
self.instanceid = config.get_instanceid()
+ self.is_inmemory_aggregation_enabled = config.is_inmemory_aggregation_enabled()
+
+ if self.is_inmemory_aggregation_enabled:
+ self.collector_port = config.get_inmemory_aggregation_port()
+ self.all_metrics_collector_hosts = ['localhost']
+ self.is_server_https_enabled = False
if self.is_server_https_enabled:
self.ca_certs = config.get_ca_certs()
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py
index bfb6957..7a9fbec 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py
@@ -117,7 +117,8 @@ class StopHandlerLinux(StopHandler):
def wait(self, timeout=None):
# Stop process when stop event received
- if self.stop_event.wait(timeout):
+ self.stop_event.wait(timeout)
+ if self.stop_event.isSet():
logger.info("Stop event received")
return 0
# Timeout
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/main.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/main.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/main.py
index d218015..53d27f8 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/main.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/main.py
@@ -21,7 +21,7 @@ limitations under the License.
import logging
import os
import sys
-
+import signal
from ambari_commons.os_utils import remove_file
from core.controller import Controller
@@ -73,6 +73,10 @@ def server_process_main(stop_handler, scmStatus=None):
if scmStatus is not None:
scmStatus.reportStarted()
+ # For some reason this is needed to catch system signals like SIGTERM
+ # TODO fix if possible
+ signal.pause()
+
#The controller thread finishes when the stop event is signaled
controller.join()
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
index 211e9cd..76b1c15 100644
--- a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
+++ b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
@@ -72,6 +72,8 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
private static final String TIMELINE_METRICS_SSL_KEYSTORE_PASSWORD_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + SSL_KEYSTORE_PASSWORD_PROPERTY;
private static final String TIMELINE_METRICS_KAFKA_INSTANCE_ID_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + INSTANCE_ID_PROPERTY;
private static final String TIMELINE_METRICS_KAFKA_SET_INSTANCE_ID_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + SET_INSTANCE_ID_PROPERTY;
+ private static final String TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY;
+ private static final String TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY;
private static final String TIMELINE_DEFAULT_HOST = "localhost";
private static final String TIMELINE_DEFAULT_PORT = "6188";
private static final String TIMELINE_DEFAULT_PROTOCOL = "http";
@@ -96,6 +98,8 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
private String[] includedMetricsPrefixes;
// Local cache to avoid prefix matching everytime
private Set<String> excludedMetrics = new HashSet<>();
+ private boolean hostInMemoryAggregationEnabled;
+ private int hostInMemoryAggregationPort;
@Override
protected String getCollectorUri(String host) {
@@ -132,6 +136,17 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
return hostname;
}
+
+ @Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return hostInMemoryAggregationEnabled;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return hostInMemoryAggregationPort;
+ }
+
public void setMetricsCache(TimelineMetricsCache metricsCache) {
this.metricsCache = metricsCache;
}
@@ -169,6 +184,8 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
instanceId = props.getString(TIMELINE_METRICS_KAFKA_INSTANCE_ID_PROPERTY);
setInstanceId = props.getBoolean(TIMELINE_METRICS_KAFKA_SET_INSTANCE_ID_PROPERTY);
+ hostInMemoryAggregationEnabled = props.getBoolean(TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY, false);
+ hostInMemoryAggregationPort = props.getInt(TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY, 61888);
setMetricsCache(new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval));
if (metricCollectorProtocol.contains("https")) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
index 08f0598..24b2c8b 100644
--- a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
+++ b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
@@ -55,6 +55,8 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
private NimbusClient nimbusClient;
private String applicationId;
private int timeoutSeconds;
+ private boolean hostInMemoryAggregationEnabled;
+ private int hostInMemoryAggregationPort;
public StormTimelineMetricsReporter() {
@@ -96,6 +98,16 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
}
@Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return hostInMemoryAggregationEnabled;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return hostInMemoryAggregationPort;
+ }
+
+ @Override
public void prepare(Map conf) {
LOG.info("Preparing Storm Metrics Reporter");
try {
@@ -130,6 +142,8 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
applicationId = cf.get(APP_ID).toString();
setInstanceId = Boolean.getBoolean(cf.get(SET_INSTANCE_ID_PROPERTY).toString());
instanceId = cf.get(INSTANCE_ID_PROPERTY).toString();
+ hostInMemoryAggregationEnabled = Boolean.valueOf(cf.get(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY).toString());
+ hostInMemoryAggregationPort = Integer.valueOf(cf.get(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY).toString());
collectorUri = constructTimelineMetricUri(protocol, findPreferredCollectHost(), port);
if (protocol.contains("https")) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index 20f60e1..c9c0538 100644
--- a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -61,6 +61,8 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
private String applicationId;
private boolean setInstanceId;
private String instanceId;
+ private boolean hostInMemoryAggregationEnabled;
+ private int hostInMemoryAggregationPort;
@Override
protected String getCollectorUri(String host) {
@@ -98,6 +100,16 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
}
@Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return hostInMemoryAggregationEnabled;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return hostInMemoryAggregationPort;
+ }
+
+ @Override
public void prepare(Map map, Object o, TopologyContext topologyContext, IErrorReporter iErrorReporter) {
LOG.info("Preparing Storm Metrics Sink");
try {
@@ -126,6 +138,8 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY);
setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY, "false"));
+ hostInMemoryAggregationEnabled = Boolean.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY));
+ hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY));
// Initialize the collector write strategy
super.init();
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
index 14f160b..5b75065 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
@@ -50,6 +50,8 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
private String instanceId;
private String applicationId;
private int timeoutSeconds;
+ private boolean hostInMemoryAggregationEnabled;
+ private int hostInMemoryAggregationPort;
public StormTimelineMetricsReporter() {
@@ -91,6 +93,16 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
}
@Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return hostInMemoryAggregationEnabled;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return hostInMemoryAggregationPort;
+ }
+
+ @Override
public void prepare(Object registrationArgument) {
LOG.info("Preparing Storm Metrics Reporter");
try {
@@ -119,6 +131,10 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
applicationId = configuration.getProperty(CLUSTER_REPORTER_APP_ID, DEFAULT_CLUSTER_REPORTER_APP_ID);
setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY));
instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY);
+
+ hostInMemoryAggregationEnabled = Boolean.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY));
+ hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY));
+
if (protocol.contains("https")) {
String trustStorePath = configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).trim();
String trustStoreType = configuration.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).trim();
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index 425201c..320e177 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -70,6 +70,8 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
private String applicationId;
private String instanceId;
private boolean setInstanceId;
+ private boolean hostInMemoryAggregationEnabled;
+ private int hostInMemoryAggregationPort;
@Override
protected String getCollectorUri(String host) {
@@ -107,6 +109,16 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
}
@Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return hostInMemoryAggregationEnabled;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return hostInMemoryAggregationPort;
+ }
+
+ @Override
public void prepare(Map map, Object o, TopologyContext topologyContext, IErrorReporter iErrorReporter) {
LOG.info("Preparing Storm Metrics Sink");
try {
@@ -137,6 +149,10 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
port = configuration.getProperty(COLLECTOR_PORT, "6188");
instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY);
setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY, "false"));
+
+ hostInMemoryAggregationEnabled = Boolean.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY));
+ hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY));
+
// Initialize the collector write strategy
super.init();
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
index c242a2f..f984253 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -24,10 +24,13 @@ import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.Precision;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricWithAggregatedValues;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
import org.apache.hadoop.service.AbstractService;
@@ -41,6 +44,7 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConditionBuilder;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.TopNCondition;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.SeriesAggregateFunction;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.TimelineMetricsSeriesAggregateFunction;
@@ -62,6 +66,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
@@ -152,10 +157,14 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
scheduleAggregatorThread(dailyClusterAggregator);
// Start the minute host aggregator
- TimelineMetricAggregator minuteHostAggregator =
- TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(
- hBaseAccessor, metricsConf, haController);
- scheduleAggregatorThread(minuteHostAggregator);
+ if (Boolean.parseBoolean(metricsConf.get(TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION, "true"))) {
+ LOG.info("timeline.metrics.host.inmemory.aggregation is set to True, disabling host minute aggregation on collector");
+ } else {
+ TimelineMetricAggregator minuteHostAggregator =
+ TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(
+ hBaseAccessor, metricsConf, haController);
+ scheduleAggregatorThread(minuteHostAggregator);
+ }
// Start the hourly host aggregator
TimelineMetricAggregator hourlyHostAggregator =
@@ -390,6 +399,18 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
}
@Override
+ public TimelinePutResponse putHostAggregatedMetrics(AggregationResult aggregationResult) throws SQLException, IOException {
+ Map<TimelineMetric, MetricHostAggregate> aggregateMap = new HashMap<>();
+ for (TimelineMetricWithAggregatedValues entry : aggregationResult.getResult()) {
+ aggregateMap.put(entry.getTimelineMetric(), entry.getMetricAggregate());
+ }
+ hBaseAccessor.saveHostAggregateRecords(aggregateMap, PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME);
+
+
+ return new TimelinePutResponse();
+ }
+
+ @Override
public Map<String, Map<String,Set<String>>> getInstanceHostsMetadata(String instanceId, String appId)
throws SQLException, IOException {
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index fb369e8..3b2a119 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -31,6 +31,8 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounterFactory;
import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.Precision;
import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
@@ -40,8 +42,6 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index 0d5042f..023465b 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -296,6 +296,8 @@ public class TimelineMetricConfiguration {
public static final String AMSHBASE_METRICS_WHITESLIST_FILE = "amshbase_metrics_whitelist";
+ public static final String TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION = "timeline.metrics.host.inmemory.aggregation";
+
private Configuration hbaseConf;
private Configuration metricsConf;
private Configuration amsEnvConf;
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
index bde09cb..d052d54 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
import org.apache.hadoop.metrics2.sink.timeline.Precision;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
@@ -80,6 +81,7 @@ public interface TimelineMetricStore {
*/
Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata(String query) throws SQLException, IOException;
+ TimelinePutResponse putHostAggregatedMetrics(AggregationResult aggregationResult) throws SQLException, IOException;
/**
* Returns all hosts that have written metrics with the apps on the host
* @return { hostname : [ appIds ] }
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java
index 65d54c0..7b03b30 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java
@@ -19,10 +19,10 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
import java.util.Map;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.Precision;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
/**
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricAggregate.java
deleted file mode 100644
index 825ac25..0000000
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricAggregate.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
-
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.codehaus.jackson.annotate.JsonProperty;
-import org.codehaus.jackson.annotate.JsonSubTypes;
-import org.codehaus.jackson.map.ObjectMapper;
-
-import java.io.IOException;
-
-/**
-*
-*/
-@JsonSubTypes({@JsonSubTypes.Type(value = MetricClusterAggregate.class),
- @JsonSubTypes.Type(value = MetricHostAggregate.class)})
-@InterfaceAudience.Public
-@InterfaceStability.Unstable
-public class MetricAggregate {
- private static final ObjectMapper mapper = new ObjectMapper();
-
- protected Double sum = 0.0;
- protected Double deviation;
- protected Double max = Double.MIN_VALUE;
- protected Double min = Double.MAX_VALUE;
-
- public MetricAggregate() {
- }
-
- MetricAggregate(Double sum, Double deviation, Double max,
- Double min) {
- this.sum = sum;
- this.deviation = deviation;
- this.max = max;
- this.min = min;
- }
-
- public void updateSum(Double sum) {
- this.sum += sum;
- }
-
- public void updateMax(Double max) {
- if (max > this.max) {
- this.max = max;
- }
- }
-
- public void updateMin(Double min) {
- if (min < this.min) {
- this.min = min;
- }
- }
-
- @JsonProperty("sum")
- public Double getSum() {
- return sum;
- }
-
- @JsonProperty("deviation")
- public Double getDeviation() {
- return deviation;
- }
-
- @JsonProperty("max")
- public Double getMax() {
- return max;
- }
-
- @JsonProperty("min")
- public Double getMin() {
- return min;
- }
-
- public void setSum(Double sum) {
- this.sum = sum;
- }
-
- public void setDeviation(Double deviation) {
- this.deviation = deviation;
- }
-
- public void setMax(Double max) {
- this.max = max;
- }
-
- public void setMin(Double min) {
- this.min = min;
- }
-
- public String toJSON() throws IOException {
- return mapper.writeValueAsString(this);
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricClusterAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricClusterAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricClusterAggregate.java
deleted file mode 100644
index 9c837b6..0000000
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricClusterAggregate.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
-
-
-import org.codehaus.jackson.annotate.JsonCreator;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-/**
-*
-*/
-public class MetricClusterAggregate extends MetricAggregate {
- private int numberOfHosts;
-
- @JsonCreator
- public MetricClusterAggregate() {
- }
-
- public MetricClusterAggregate(Double sum, int numberOfHosts, Double deviation,
- Double max, Double min) {
- super(sum, deviation, max, min);
- this.numberOfHosts = numberOfHosts;
- }
-
- @JsonProperty("numberOfHosts")
- public int getNumberOfHosts() {
- return numberOfHosts;
- }
-
- public void updateNumberOfHosts(int count) {
- this.numberOfHosts += count;
- }
-
- public void setNumberOfHosts(int numberOfHosts) {
- this.numberOfHosts = numberOfHosts;
- }
-
- /**
- * Find and update min, max and avg for a minute
- */
- public void updateAggregates(MetricClusterAggregate hostAggregate) {
- updateMax(hostAggregate.getMax());
- updateMin(hostAggregate.getMin());
- updateSum(hostAggregate.getSum());
- updateNumberOfHosts(hostAggregate.getNumberOfHosts());
- }
-
- @Override
- public String toString() {
- return "MetricAggregate{" +
- "sum=" + sum +
- ", numberOfHosts=" + numberOfHosts +
- ", deviation=" + deviation +
- ", max=" + max +
- ", min=" + min +
- '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricHostAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricHostAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricHostAggregate.java
deleted file mode 100644
index 340ec75..0000000
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricHostAggregate.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
-
-
-import org.codehaus.jackson.annotate.JsonCreator;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-/**
- * Represents a collection of minute based aggregation of values for
- * resolution greater than a minute.
- */
-public class MetricHostAggregate extends MetricAggregate {
-
- private long numberOfSamples = 0;
-
- @JsonCreator
- public MetricHostAggregate() {
- super(0.0, 0.0, Double.MIN_VALUE, Double.MAX_VALUE);
- }
-
- public MetricHostAggregate(Double sum, int numberOfSamples,
- Double deviation,
- Double max, Double min) {
- super(sum, deviation, max, min);
- this.numberOfSamples = numberOfSamples;
- }
-
- @JsonProperty("numberOfSamples")
- public long getNumberOfSamples() {
- return numberOfSamples == 0 ? 1 : numberOfSamples;
- }
-
- public void updateNumberOfSamples(long count) {
- this.numberOfSamples += count;
- }
-
- public void setNumberOfSamples(long numberOfSamples) {
- this.numberOfSamples = numberOfSamples;
- }
-
- public double getAvg() {
- return sum / numberOfSamples;
- }
-
- /**
- * Find and update min, max and avg for a minute
- */
- public void updateAggregates(MetricHostAggregate hostAggregate) {
- updateMax(hostAggregate.getMax());
- updateMin(hostAggregate.getMin());
- updateSum(hostAggregate.getSum());
- updateNumberOfSamples(hostAggregate.getNumberOfSamples());
- }
-
- @Override
- public String toString() {
- return "MetricHostAggregate{" +
- "sum=" + sum +
- ", numberOfSamples=" + numberOfSamples +
- ", deviation=" + deviation +
- ", max=" + max +
- ", min=" + min +
- '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
index 44aca03..9eaf456 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
@@ -21,6 +21,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricsFilter;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
index 0934356..ba16b43 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
index a5a3499..34b1f9b 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
@@ -38,6 +38,7 @@ import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
import org.apache.hadoop.metrics2.sink.timeline.PostProcessingUtil;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
index 0ea9c08..a17433b 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
index b5f49fb..672f85f 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
index 9da921a..50cfb08 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
@@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
import org.apache.hadoop.metrics2.sink.timeline.PrecisionLimitExceededException;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
@@ -285,6 +286,36 @@ public class TimelineWebServices {
}
}
+ /**
+ * Store the given metrics into the timeline store, and return errors that
+ * happened during storing.
+ */
+ @Path("/metrics/aggregated")
+ @POST
+ @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+ public TimelinePutResponse postAggregatedMetrics(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res,
+ AggregationResult metrics) {
+
+ init(res);
+ if (metrics == null) {
+ return new TimelinePutResponse();
+ }
+
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing aggregated metrics: " +
+ TimelineUtils.dumpTimelineRecordtoJSON(metrics, true));
+ }
+
+ return timelineMetricStore.putHostAggregatedMetrics(metrics);
+ } catch (Exception e) {
+ LOG.error("Error saving metrics.", e);
+ throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ }
+
@Path("/containermetrics")
@POST
@Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
index 0087fd9..d5baaef 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
@@ -26,12 +26,12 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.Precision;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java
index 37ec134..7eeb9c4 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java
@@ -17,9 +17,9 @@
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
import java.util.Arrays;
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
index a910cc2..d668178 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
@@ -22,11 +22,11 @@ import com.google.common.collect.Multimap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.Precision;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
index 44f48e8..3009163 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
.timeline;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
@@ -34,7 +34,7 @@ public class TestMetricHostAggregate {
assertThat(aggregate.getSum()).isEqualTo(3.0);
assertThat(aggregate.getMin()).isEqualTo(1.0);
assertThat(aggregate.getMax()).isEqualTo(2.0);
- assertThat(aggregate.getAvg()).isEqualTo(3.0 / 2);
+ assertThat(aggregate.calculateAverage()).isEqualTo(3.0 / 2);
}
@Test
@@ -50,7 +50,7 @@ public class TestMetricHostAggregate {
assertThat(aggregate.getSum()).isEqualTo(12.0);
assertThat(aggregate.getMin()).isEqualTo(0.5);
assertThat(aggregate.getMax()).isEqualTo(7.5);
- assertThat(aggregate.getAvg()).isEqualTo((3.0 + 8.0 + 1.0) / 5);
+ assertThat(aggregate.calculateAverage()).isEqualTo((3.0 + 8.0 + 1.0) / 5);
}
static MetricHostAggregate createAggregate (Double sum, Double min,
@@ -63,4 +63,4 @@ public class TestMetricHostAggregate {
aggregate.setNumberOfSamples(samplesCount);
return aggregate;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
index f00906e..ac2f9d7 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
import org.apache.hadoop.metrics2.sink.timeline.Precision;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
@@ -92,6 +93,11 @@ public class TestTimelineMetricStore implements TimelineMetricStore {
}
@Override
+ public TimelinePutResponse putHostAggregatedMetrics(AggregationResult aggregationResult) throws SQLException, IOException {
+ return null;
+ }
+
+ @Override
public Map<String, Set<String>> getHostAppsMetadata() throws SQLException, IOException {
return Collections.emptyMap();
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java
index fa0cfe9..53f6f6c 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java
@@ -17,10 +17,10 @@
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.Precision;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
import java.util.Collections;
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
index f083731..07fd85d 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
@@ -20,13 +20,13 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractMiniHBaseClusterTest;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java
index 9873643..75b3f91 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractMiniHBaseClusterTest;
@@ -124,14 +125,14 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
assertEquals(0.0, currentHostAggregate.getMin());
assertEquals(20, currentHostAggregate.getNumberOfSamples());
assertEquals(15.0, currentHostAggregate.getSum());
- assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+ assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
count++;
} else if ("mem_free".equals(currentMetric.getMetricName())) {
assertEquals(2.0, currentHostAggregate.getMax());
assertEquals(0.0, currentHostAggregate.getMin());
assertEquals(20, currentHostAggregate.getNumberOfSamples());
assertEquals(15.0, currentHostAggregate.getSum());
- assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+ assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
count++;
} else {
fail("Unexpected entry");
@@ -198,7 +199,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
assertEquals(0.0, currentHostAggregate.getMin());
assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples());
assertEquals(12 * 15.0, currentHostAggregate.getSum());
- assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+ assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
}
}
}
@@ -260,7 +261,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
assertEquals(0.0, currentHostAggregate.getMin());
assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples());
assertEquals(12 * 15.0, currentHostAggregate.getSum());
- assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+ assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
}
}
}
@@ -309,14 +310,14 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
assertEquals(0.0, currentHostAggregate.getMin());
assertEquals(20, currentHostAggregate.getNumberOfSamples());
assertEquals(15.0, currentHostAggregate.getSum());
- assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+ assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
count++;
} else if ("mem_free".equals(currentMetric.getMetricName())) {
assertEquals(2.0, currentHostAggregate.getMax());
assertEquals(0.0, currentHostAggregate.getMin());
assertEquals(20, currentHostAggregate.getNumberOfSamples());
assertEquals(15.0, currentHostAggregate.getSum());
- assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+ assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
count++;
} else {
fail("Unexpected entry");
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
index 78db11d..6541b2c 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
@@ -31,6 +31,7 @@ import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/pom.xml b/ambari-metrics/pom.xml
index 2d88912..02f9574 100644
--- a/ambari-metrics/pom.xml
+++ b/ambari-metrics/pom.xml
@@ -33,6 +33,7 @@
<module>ambari-metrics-host-monitoring</module>
<module>ambari-metrics-grafana</module>
<module>ambari-metrics-assembly</module>
+ <module>ambari-metrics-host-aggregator</module>
</modules>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java
index 8d1f63f..a0765bf 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java
@@ -300,6 +300,16 @@ public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements
return hostName;
}
+ @Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return false;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return 0;
+ }
+
private List<TimelineMetric> getFilteredMetricList(List<SingleMetric> metrics) {
final List<TimelineMetric> metricList = new ArrayList<>();
for (SingleMetric metric : metrics) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/scripts/params.py b/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/scripts/params.py
index 150b0a8..5d21514 100644
--- a/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/scripts/params.py
+++ b/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/scripts/params.py
@@ -153,6 +153,8 @@ if has_metric_collector:
pass
metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60)
metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10)
+host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)
+host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
# if accumulo is selected accumulo_tserver_hosts should not be empty, but still default just in case
if 'slave_hosts' in config['clusterHostInfo']:
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/templates/hadoop-metrics2-accumulo.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/templates/hadoop-metrics2-accumulo.properties.j2 b/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/templates/hadoop-metrics2-accumulo.properties.j2
index 6873c85..742ea3c 100644
--- a/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/templates/hadoop-metrics2-accumulo.properties.j2
+++ b/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/templates/hadoop-metrics2-accumulo.properties.j2
@@ -16,6 +16,9 @@
# Poll collectors every {{metrics_report_interval}} seconds
*.period={{metrics_collection_period}}
+*.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+*.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
+
{% if has_metric_collector %}
*.timeline.plugin.urls=file:///usr/lib/ambari-metrics-hadoop-sink/ambari-metrics-hadoop-sink.jar
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml
index cb66537..4d33661 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml
@@ -101,6 +101,14 @@
<on-ambari-upgrade add="true"/>
</property>
<property>
+ <name>timeline.metrics.host.inmemory.aggregation.jvm.arguments</name>
+ <value>-Xmx256m -Xms128m -XX:PermSize=68m</value>
+ <description>
+ Local aggregator jvm extra arguments separated with spaces
+ </description>
+ <on-ambari-upgrade add="true"/>
+ </property>
+ <property>
<name>timeline.metrics.skip.network.interfaces.patterns</name>
<value>None</value>
<description>
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
index 8e1671e..1b085f6 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
@@ -787,4 +787,15 @@
<value>{{cluster_zookeeper_clientPort}}</value>
<on-ambari-upgrade add="true"/>
</property>
+ <property>
+ <name>timeline.metrics.host.inmemory.aggregation</name>
+ <value>false</value>
+ <description>if set to "true" host metrics will be aggregated in memory on each host</description>
+ <on-ambari-upgrade add="true"/>
+ </property>
+ <property>
+ <name>timeline.metrics.host.inmemory.aggregation.port</name>
+ <value>61888</value>
+ <on-ambari-upgrade add="true"/>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml
index 740a91a..9031b46 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml
@@ -93,6 +93,9 @@
<primary>true</primary>
</log>
</logs>
+ <configuration-dependencies>
+ <config-type>ams-site</config-type>
+ </configuration-dependencies>
</component>
<component>
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py
index a929847..f49d47d 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py
@@ -163,6 +163,20 @@ def ams(name=None):
create_parents = True
)
+ if params.host_in_memory_aggregation and params.log4j_props is not None:
+ File(os.path.join(params.ams_monitor_conf_dir, "log4j.properties"),
+ owner=params.ams_user,
+ content=params.log4j_props
+ )
+
+ XmlConfig("ams-site.xml",
+ conf_dir=params.ams_monitor_conf_dir,
+ configurations=params.config['configurations']['ams-site'],
+ configuration_attributes=params.config['configuration_attributes']['ams-site'],
+ owner=params.ams_user,
+ group=params.user_group
+ )
+
TemplateConfig(
os.path.join(params.ams_monitor_conf_dir, "metric_monitor.ini"),
owner=params.ams_user,
@@ -366,6 +380,22 @@ def ams(name=None, action=None):
create_parents = True
)
+ if params.host_in_memory_aggregation and params.log4j_props is not None:
+ File(format("{params.ams_monitor_conf_dir}/log4j.properties"),
+ mode=0644,
+ group=params.user_group,
+ owner=params.ams_user,
+ content=InlineTemplate(params.log4j_props)
+ )
+
+ XmlConfig("ams-site.xml",
+ conf_dir=params.ams_monitor_conf_dir,
+ configurations=params.config['configurations']['ams-site'],
+ configuration_attributes=params.config['configuration_attributes']['ams-site'],
+ owner=params.ams_user,
+ group=params.user_group
+ )
+
Execute(format("{sudo} chown -R {ams_user}:{user_group} {ams_monitor_log_dir}")
)
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
index 50dde1c..b8c14f4 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
@@ -224,6 +224,11 @@ metrics_collector_heapsize = check_append_heap_property(str(metrics_collector_he
master_heapsize = check_append_heap_property(str(master_heapsize), "m")
regionserver_heapsize = check_append_heap_property(str(regionserver_heapsize), "m")
+host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)
+host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
+host_in_memory_aggregation_jvm_arguments = default("/configurations/ams-env/timeline.metrics.host.inmemory.aggregation.jvm.arguments",
+ "-Xmx256m -Xms128m -XX:PermSize=68m")
+
regionserver_xmn_max = default('/configurations/ams-hbase-env/hbase_regionserver_xmn_max', None)
if regionserver_xmn_max:
regionserver_xmn_max = int(trim_heap_property(str(regionserver_xmn_max), "m"))
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/hadoop-metrics2-hbase.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/hadoop-metrics2-hbase.properties.j2 b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/hadoop-metrics2-hbase.properties.j2
index 9729bbe..bb0db4f 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/hadoop-metrics2-hbase.properties.j2
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/hadoop-metrics2-hbase.properties.j2
@@ -58,6 +58,9 @@ rpc.protocol={{metric_collector_protocol}}
*.timeline.plugin.urls=file:///usr/lib/ambari-metrics-hadoop-sink/ambari-metrics-hadoop-sink.jar
*.sink.timeline.slave.host.name={{hostname}}
+*.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+*.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
+
hbase.sink.timeline.class=org.apache.hadoop.metrics2.sink.timeline.HadoopTimelineMetricsSink
hbase.sink.timeline.period={{metrics_collection_period}}
hbase.sink.timeline.sendInterval={{metrics_report_interval}}000
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2 b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2
index 769ad67..b7dee50 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2
@@ -38,3 +38,10 @@ failover_strategy = {{failover_strategy}}
failover_strategy_blacklisted_interval_seconds = {{failover_strategy_blacklisted_interval_seconds}}
port = {{metric_collector_port}}
https_enabled = {{metric_collector_https_enabled}}
+
+[aggregation]
+host_in_memory_aggregation = {{host_in_memory_aggregation}}
+host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
+java_home = {{java64_home}}
+jvm_arguments = {{host_in_memory_aggregation_jvm_arguments}}
+ams_monitor_log_dir = {{ams_monitor_log_dir}}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py
index 86a290f..0e0c9aa 100644
--- a/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py
+++ b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py
@@ -124,6 +124,9 @@ if has_metric_collector:
metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60)
metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10)
+host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)
+host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
+
# Cluster Zookeeper quorum
zookeeper_quorum = None
if not len(default("/clusterHostInfo/zookeeper_hosts", [])) == 0: