You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2016/03/30 20:35:28 UTC
[2/2] ambari git commit: AMBARI-15623. Support distributed
aggregation for multiple AMS instances. (swagle)
AMBARI-15623. Support distributed aggregation for multiple AMS instances. (swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/dfa4454e
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/dfa4454e
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/dfa4454e
Branch: refs/heads/trunk
Commit: dfa4454e7d69fb160924a3877d3f3f1a6314c7bd
Parents: c479fdd
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Wed Mar 30 11:35:07 2016 -0700
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Wed Mar 30 11:35:07 2016 -0700
----------------------------------------------------------------------
.../ambari-metrics-timelineservice/pom.xml | 29 +-
.../timeline/HBaseTimelineMetricStore.java | 70 ++++-
.../timeline/TimelineMetricConfiguration.java | 46 ++++
.../metrics/timeline/TimelineMetricStore.java | 6 +
.../aggregators/AbstractTimelineAggregator.java | 106 +++++--
.../aggregators/TimelineMetricAggregator.java | 26 +-
.../TimelineMetricAggregatorFactory.java | 100 ++++---
.../TimelineMetricClusterAggregator.java | 9 +-
.../TimelineMetricClusterAggregatorSecond.java | 13 +-
.../TimelineMetricHostAggregator.java | 11 +-
.../v2/TimelineMetricClusterAggregator.java | 11 +-
.../v2/TimelineMetricHostAggregator.java | 10 +-
.../availability/AggregationTaskRunner.java | 144 ++++++++++
.../availability/CheckpointManager.java | 98 +++++++
.../OnlineOfflineStateModelFactory.java | 69 +++++
.../TimelineMetricHAController.java | 276 +++++++++++++++++++
.../query/DefaultPhoenixDataSource.java | 2 +-
.../webapp/TimelineWebServices.java | 12 +
.../timeline/ITPhoenixHBaseAccessor.java | 12 +-
.../timeline/TestTimelineMetricStore.java | 5 +
.../AbstractTimelineAggregatorTest.java | 7 +-
.../aggregators/ITClusterAggregator.java | 20 +-
.../aggregators/ITMetricAggregator.java | 19 +-
...melineMetricClusterAggregatorSecondTest.java | 9 +-
.../TimelineMetricHAControllerTest.java | 107 +++++++
.../server/upgrade/UpgradeCatalog240.java | 33 ++-
.../0.1.0/configuration/ams-env.xml | 3 +
.../AMBARI_METRICS/0.1.0/metainfo.xml | 2 +-
.../server/upgrade/UpgradeCatalog240Test.java | 3 +
29 files changed, 1112 insertions(+), 146 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/pom.xml b/ambari-metrics/ambari-metrics-timelineservice/pom.xml
index b435964..f180715 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/pom.xml
+++ b/ambari-metrics/ambari-metrics-timelineservice/pom.xml
@@ -34,9 +34,9 @@
<!-- Needed for generating FindBugs warnings using parent pom -->
<!--<yarn.basedir>${project.parent.parent.basedir}</yarn.basedir>-->
<protobuf.version>2.5.0</protobuf.version>
- <hadoop.version>2.7.1.2.3.4.0-3347</hadoop.version>
- <phoenix.version>4.4.0.2.3.4.0-3347</phoenix.version>
- <hbase.version>1.1.2.2.3.4.0-3347</hbase.version>
+ <hadoop.version>[2.7.1.2.5.0.0,2.7.1.2.5.0.0-9999)</hadoop.version>
+ <phoenix.version>[4.4.0.2.5.0.0,4.4.0.2.5.0.0-9999)</phoenix.version>
+ <hbase.version>[1.1.2.2.5.0.0,1.1.2.2.5.0.0-9999)</hbase.version>
</properties>
<build>
@@ -249,6 +249,25 @@
</build>
<dependencies>
+
+ <dependency>
+ <groupId>org.apache.helix</groupId>
+ <artifactId>helix-core</artifactId>
+ <version>0.6.5</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>zookeeper</artifactId>
+ <groupId>org.apache.zookeeper</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <artifactId>zookeeper</artifactId>
+ <groupId>org.apache.zookeeper</groupId>
+ <version>3.4.8</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId>
@@ -565,6 +584,10 @@
<groupId>org.jruby</groupId>
<artifactId>jruby-complete</artifactId>
</exclusion>
+ <exclusion>
+ <artifactId>zookeeper</artifactId>
+ <groupId>org.apache.zookeeper</groupId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
index a32e206..a5204e1 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
@@ -46,11 +48,11 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
public class HBaseTimelineMetricStore extends AbstractService implements TimelineMetricStore {
@@ -58,8 +60,10 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
private final TimelineMetricConfiguration configuration;
private PhoenixHBaseAccessor hBaseAccessor;
private static volatile boolean isInitialized = false;
- private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+ private final ScheduledExecutorService watchdogExecutorService = Executors.newSingleThreadScheduledExecutor();
+ private final Map<AGGREGATOR_NAME, ScheduledExecutorService> scheduledExecutors = new HashMap<>();
private TimelineMetricMetadataManager metricMetadataManager;
+ private TimelineMetricHAController haController;
/**
* Construct the service.
@@ -87,6 +91,18 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
metricMetadataManager.initializeMetadata();
// Initialize policies before TTL update
hBaseAccessor.initPoliciesAndTTL();
+ // Start HA service
+ if (configuration.isDistributedOperationModeEnabled()) {
+ // Start the controller
+ haController = new TimelineMetricHAController(configuration);
+ try {
+ haController.initializeHAController();
+ } catch (Exception e) {
+ LOG.error(e);
+ throw new MetricsSystemInitializationException("Unable to " +
+ "initialize HA controller", e);
+ }
+ }
if (Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) {
LOG.info("Using group by aggregators for aggregating host and cluster metrics.");
@@ -94,46 +110,53 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
// Start the cluster aggregator second
TimelineMetricAggregator secondClusterAggregator =
- TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hBaseAccessor, metricsConf, metricMetadataManager);
+ TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(
+ hBaseAccessor, metricsConf, metricMetadataManager, haController);
scheduleAggregatorThread(secondClusterAggregator);
// Start the minute cluster aggregator
TimelineMetricAggregator minuteClusterAggregator =
- TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hBaseAccessor, metricsConf);
+ TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(
+ hBaseAccessor, metricsConf, haController);
scheduleAggregatorThread(minuteClusterAggregator);
// Start the hourly cluster aggregator
TimelineMetricAggregator hourlyClusterAggregator =
- TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hBaseAccessor, metricsConf);
+ TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(
+ hBaseAccessor, metricsConf, haController);
scheduleAggregatorThread(hourlyClusterAggregator);
// Start the daily cluster aggregator
TimelineMetricAggregator dailyClusterAggregator =
- TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hBaseAccessor, metricsConf);
+ TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(
+ hBaseAccessor, metricsConf, haController);
scheduleAggregatorThread(dailyClusterAggregator);
// Start the minute host aggregator
TimelineMetricAggregator minuteHostAggregator =
- TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hBaseAccessor, metricsConf);
+ TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(
+ hBaseAccessor, metricsConf, haController);
scheduleAggregatorThread(minuteHostAggregator);
// Start the hourly host aggregator
TimelineMetricAggregator hourlyHostAggregator =
- TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hBaseAccessor, metricsConf);
+ TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(
+ hBaseAccessor, metricsConf, haController);
scheduleAggregatorThread(hourlyHostAggregator);
// Start the daily host aggregator
TimelineMetricAggregator dailyHostAggregator =
- TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(hBaseAccessor, metricsConf);
+ TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(
+ hBaseAccessor, metricsConf, haController);
scheduleAggregatorThread(dailyHostAggregator);
if (!configuration.isTimelineMetricsServiceWatcherDisabled()) {
int initDelay = configuration.getTimelineMetricsServiceWatcherInitDelay();
int delay = configuration.getTimelineMetricsServiceWatcherDelay();
// Start the watchdog
- executorService.scheduleWithFixedDelay(
- new TimelineMetricStoreWatcher(this, configuration), initDelay, delay,
- TimeUnit.SECONDS);
+ watchdogExecutorService.scheduleWithFixedDelay(
+ new TimelineMetricStoreWatcher(this, configuration),
+ initDelay, delay, TimeUnit.SECONDS);
LOG.info("Started watchdog for timeline metrics store with initial " +
"delay = " + initDelay + ", delay = " + delay);
}
@@ -333,13 +356,30 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
return metricMetadataManager.getHostedAppsCache();
}
- private void scheduleAggregatorThread(TimelineMetricAggregator aggregator) {
- ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+ @Override
+ public List<String> getLiveInstances() {
+ return haController.getLiveInstanceHostNames();
+ }
+
+ private void scheduleAggregatorThread(final TimelineMetricAggregator aggregator) {
if (!aggregator.isDisabled()) {
+ ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, ACTUAL_AGGREGATOR_NAMES.get(aggregator.getName()));
+ }
+ }
+ );
+ scheduledExecutors.put(aggregator.getName(), executorService);
executorService.scheduleAtFixedRate(aggregator,
0l,
aggregator.getSleepIntervalMillis(),
TimeUnit.MILLISECONDS);
+ LOG.info("Scheduled aggregator thread " + aggregator.getName() + " every " +
+ + aggregator.getSleepIntervalMillis() + " milliseconds.");
+ } else {
+ LOG.info("Skipped scheduling " + aggregator.getName() + " since it is disabled.");
}
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index e57f02d..90c1d78 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -23,9 +23,11 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
+import java.net.UnknownHostException;
/**
* Configuration class that reads properties from ams-site.xml. All values
@@ -38,6 +40,7 @@ public class TimelineMetricConfiguration {
public static final String HBASE_SITE_CONFIGURATION_FILE = "hbase-site.xml";
public static final String METRICS_SITE_CONFIGURATION_FILE = "ams-site.xml";
+ public static final String METRICS_ENV_CONFIGURATION_FILE = "ams-env.xml";
public static final String TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR =
"timeline.metrics.aggregator.checkpoint.dir";
@@ -221,8 +224,11 @@ public class TimelineMetricConfiguration {
public static final String HOST_APP_ID = "HOST";
+ public static final String DEFAULT_INSTANCE_PORT = "12001";
+
private Configuration hbaseConf;
private Configuration metricsConf;
+ private Configuration amsEnvConf;
private volatile boolean isInitialized = false;
public void initialize() throws URISyntaxException, MalformedURLException {
@@ -249,6 +255,7 @@ public class TimelineMetricConfiguration {
hbaseConf.addResource(hbaseResUrl.toURI().toURL());
metricsConf = new Configuration(true);
metricsConf.addResource(amsResUrl.toURI().toURL());
+
isInitialized = true;
}
@@ -266,6 +273,37 @@ public class TimelineMetricConfiguration {
return metricsConf;
}
+ public String getZKClientPort() throws MalformedURLException, URISyntaxException {
+ if (!isInitialized) {
+ initialize();
+ }
+ return hbaseConf.getTrimmed("hbase.zookeeper.property.clientPort", "2181");
+ }
+
+ public String getZKQuorum() throws MalformedURLException, URISyntaxException {
+ if (!isInitialized) {
+ initialize();
+ }
+ return hbaseConf.getTrimmed("hbase.zookeeper.quorum");
+ }
+
+ public String getInstanceHostnameFromEnv() throws UnknownHostException {
+ String amsInstanceName = System.getProperty("AMS_INSTANCE_NAME");
+ if (amsInstanceName == null) {
+ amsInstanceName = InetAddress.getLocalHost().getHostName();
+ }
+ return amsInstanceName;
+ }
+
+ public String getInstancePort() throws MalformedURLException, URISyntaxException {
+ String amsInstancePort = System.getProperty("AMS_INSTANCE_PORT");
+ if (amsInstancePort == null) {
+ // Check config
+ return getMetricsConf().get("timeline.metrics.availability.instance.port", DEFAULT_INSTANCE_PORT);
+ }
+ return DEFAULT_INSTANCE_PORT;
+ }
+
public String getWebappAddress() {
String defaultHttpAddress = "0.0.0.0:6188";
if (metricsConf != null) {
@@ -323,4 +361,12 @@ public class TimelineMetricConfiguration {
}
return defaultRpcAddress;
}
+
+ public boolean isDistributedOperationModeEnabled() {
+ try {
+ return getMetricsConf().get("timeline.metrics.service.operation.mode").equals("distributed");
+ } catch (Exception e) {
+ return false;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
index 0aa102e..2f08f3f 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
@@ -87,4 +87,10 @@ public interface TimelineMetricStore {
* @throws IOException
*/
Map<String, Set<String>> getHostAppsMetadata() throws SQLException, IOException;
+
+ /**
+ * Return a list of known live collector nodes
+ * @return [ hostname ]
+ */
+ List<String> getLiveInstances();
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
index ba7807b..ae87cf1 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
@@ -20,6 +20,9 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
import org.slf4j.LoggerFactory;
@@ -34,6 +37,7 @@ import java.util.Date;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.RESULTSET_FETCH_SIZE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
/**
* Base class for all runnable aggregators. Provides common functions like
@@ -52,11 +56,12 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
protected String tableName;
protected String outputTableName;
protected Long nativeTimeRangeDelay;
+ protected AggregationTaskRunner taskRunner;
// Explicitly name aggregators for logging needs
- private final String aggregatorName;
+ private final AGGREGATOR_NAME aggregatorName;
- AbstractTimelineAggregator(String aggregatorName,
+ AbstractTimelineAggregator(AGGREGATOR_NAME aggregatorName,
PhoenixHBaseAccessor hBaseAccessor,
Configuration metricsConf) {
this.aggregatorName = aggregatorName;
@@ -64,10 +69,10 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
this.metricsConf = metricsConf;
this.checkpointDelayMillis = SECONDS.toMillis(metricsConf.getInt(AGGREGATOR_CHECKPOINT_DELAY, 120));
this.resultsetFetchSize = metricsConf.getInt(RESULTSET_FETCH_SIZE, 2000);
- this.LOG = LoggerFactory.getLogger(aggregatorName);
+ this.LOG = LoggerFactory.getLogger(ACTUAL_AGGREGATOR_NAMES.get(aggregatorName));
}
- public AbstractTimelineAggregator(String aggregatorName,
+ public AbstractTimelineAggregator(AGGREGATOR_NAME aggregatorName,
PhoenixHBaseAccessor hBaseAccessor,
Configuration metricsConf,
String checkpointLocation,
@@ -76,7 +81,8 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
String aggregatorDisableParam,
String tableName,
String outputTableName,
- Long nativeTimeRangeDelay) {
+ Long nativeTimeRangeDelay,
+ TimelineMetricHAController haController) {
this(aggregatorName, hBaseAccessor, metricsConf);
this.checkpointLocation = checkpointLocation;
this.sleepIntervalMillis = sleepIntervalMillis;
@@ -84,7 +90,9 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
this.aggregatorDisableParam = aggregatorDisableParam;
this.tableName = tableName;
this.outputTableName = outputTableName;
- this.nativeTimeRangeDelay = nativeTimeRangeDelay;
+ this.nativeTimeRangeDelay = nativeTimeRangeDelay;
+ this.taskRunner = haController != null && haController.isInitialized() ?
+ haController.getAggregationTaskRunner() : null;
}
@Override
@@ -98,25 +106,39 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
* Access relaxed for tests
*/
public void runOnce(Long SLEEP_INTERVAL) {
+ boolean performAggregationFunction = true;
+ if (taskRunner != null) {
+ switch (getAggregatorType()) {
+ case HOST:
+ performAggregationFunction = taskRunner.performsHostAggregation();
+ break;
+ case CLUSTER:
+ performAggregationFunction = taskRunner.performsClusterAggregation();
+ }
+ }
- long currentTime = System.currentTimeMillis();
- long lastCheckPointTime = readLastCheckpointSavingOnFirstRun(currentTime);
-
- if (lastCheckPointTime != -1) {
- LOG.info("Last check point time: " + lastCheckPointTime + ", lagBy: "
- + ((currentTime - lastCheckPointTime) / 1000)
- + " seconds.");
-
- boolean success = doWork(lastCheckPointTime, lastCheckPointTime + SLEEP_INTERVAL);
+ if (performAggregationFunction) {
+ long currentTime = System.currentTimeMillis();
+ long lastCheckPointTime = readLastCheckpointSavingOnFirstRun(currentTime);
- if (success) {
- try {
- saveCheckPoint(lastCheckPointTime + SLEEP_INTERVAL);
- } catch (IOException io) {
- LOG.warn("Error saving checkpoint, restarting aggregation at " +
- "previous checkpoint.");
+ if (lastCheckPointTime != -1) {
+ LOG.info("Last check point time: " + lastCheckPointTime + ", lagBy: "
+ + ((currentTime - lastCheckPointTime) / 1000)
+ + " seconds.");
+
+ boolean success = doWork(lastCheckPointTime, lastCheckPointTime + SLEEP_INTERVAL);
+
+ if (success) {
+ try {
+ saveCheckPoint(lastCheckPointTime + SLEEP_INTERVAL);
+ } catch (IOException io) {
+ LOG.warn("Error saving checkpoint, restarting aggregation at " +
+ "previous checkpoint.");
+ }
}
}
+ } else {
+ LOG.info("Skipping aggregation function not owned by this instance.");
}
}
@@ -174,6 +196,9 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
}
protected long readCheckPoint() {
+ if (taskRunner != null) {
+ return taskRunner.getCheckpointManager().readCheckpoint(aggregatorName);
+ }
try {
File checkpoint = new File(getCheckpointLocation());
if (checkpoint.exists()) {
@@ -189,15 +214,23 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
}
protected void saveCheckPoint(long checkpointTime) throws IOException {
- File checkpoint = new File(getCheckpointLocation());
- if (!checkpoint.exists()) {
- boolean done = checkpoint.createNewFile();
- if (!done) {
- throw new IOException("Could not create checkpoint at location, " +
- getCheckpointLocation());
+ if (taskRunner != null) {
+ boolean success = taskRunner.getCheckpointManager().writeCheckpoint(aggregatorName, checkpointTime);
+ if (!success) {
+ LOG.error("Error saving checkpoint with AggregationTaskRunner, " +
+ "aggregator = " + aggregatorName + "value = " + checkpointTime);
}
+ } else {
+ File checkpoint = new File(getCheckpointLocation());
+ if (!checkpoint.exists()) {
+ boolean done = checkpoint.createNewFile();
+ if (!done) {
+ throw new IOException("Could not create checkpoint at location, " +
+ getCheckpointLocation());
+ }
+ }
+ FileUtils.writeStringToFile(checkpoint, String.valueOf(checkpointTime));
}
- FileUtils.writeStringToFile(checkpoint, String.valueOf(checkpointTime));
}
/**
@@ -317,4 +350,21 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
return currentTime - (currentTime % aggregatorPeriod);
}
+ /**
+ * Get @AGGREGATOR_TYPE based on the output table.
+ * This is solely used by the HAController to determine which lock to acquire.
+ */
+ public AGGREGATOR_TYPE getAggregatorType() {
+ if (outputTableName.contains("RECORD")) {
+ return AGGREGATOR_TYPE.HOST;
+ } else if (outputTableName.contains("AGGREGATE")) {
+ return AGGREGATOR_TYPE.CLUSTER;
+ }
+ return null;
+ }
+
+ @Override
+ public AGGREGATOR_NAME getName() {
+ return aggregatorName;
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
index 295db0e..150e3f1 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
@@ -1,5 +1,7 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -20,22 +22,38 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
public interface TimelineMetricAggregator extends Runnable {
/**
* Aggregate metric data within the time bounds.
+ *
* @param startTime start time millis
- * @param endTime end time millis
+ * @param endTime end time millis
* @return success
*/
- public boolean doWork(long startTime, long endTime);
+ boolean doWork(long startTime, long endTime);
/**
* Is aggregator is disabled by configuration.
+ *
* @return true/false
*/
- public boolean isDisabled();
+ boolean isDisabled();
/**
* Return aggregator Interval
+ *
* @return Interval in Millis
*/
- public Long getSleepIntervalMillis();
+ Long getSleepIntervalMillis();
+
+ /**
+ * Get aggregator name
+ * @return @AGGREGATOR_NAME
+ */
+ AGGREGATOR_NAME getName();
+ /**
+ * Known aggregator types
+ */
+ enum AGGREGATOR_TYPE {
+ CLUSTER,
+ HOST
}
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
index cc85c56..4c44f9e 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
import org.apache.commons.io.FilenameUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -29,12 +30,12 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_DISABLED;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_DISABLED;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_DISABLED;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_DISABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER;
@@ -48,6 +49,13 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_DAILY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_HOURLY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_MINUTE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_DAILY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_HOURLY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_MINUTE;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
@@ -86,7 +94,8 @@ public class TimelineMetricAggregatorFactory {
* Interval : 5 mins
*/
public static TimelineMetricAggregator createTimelineMetricAggregatorMinute
- (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+ (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+ TimelineMetricHAController haController) {
String checkpointDir = metricsConf.get(
TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -104,7 +113,7 @@ public class TimelineMetricAggregatorFactory {
if (useGroupByAggregator(metricsConf)) {
return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricHostAggregator(
- "TimelineMetricHostAggregatorMinute",
+ METRIC_RECORD_MINUTE,
hBaseAccessor, metricsConf,
checkpointLocation,
sleepIntervalMillis,
@@ -112,12 +121,13 @@ public class TimelineMetricAggregatorFactory {
hostAggregatorDisabledParam,
inputTableName,
outputTableName,
- 120000l
+ 120000l,
+ haController
);
}
return new TimelineMetricHostAggregator(
- "TimelineMetricHostAggregatorMinute",
+ METRIC_RECORD_MINUTE,
hBaseAccessor, metricsConf,
checkpointLocation,
sleepIntervalMillis,
@@ -125,7 +135,8 @@ public class TimelineMetricAggregatorFactory {
hostAggregatorDisabledParam,
inputTableName,
outputTableName,
- 120000l);
+ 120000l,
+ haController);
}
/**
@@ -133,7 +144,8 @@ public class TimelineMetricAggregatorFactory {
* Interval : 1 hour
*/
public static TimelineMetricAggregator createTimelineMetricAggregatorHourly
- (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+ (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+ TimelineMetricHAController haController) {
String checkpointDir = metricsConf.get(
TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -151,7 +163,7 @@ public class TimelineMetricAggregatorFactory {
if (useGroupByAggregator(metricsConf)) {
return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricHostAggregator(
- "TimelineMetricHostAggregatorHourly",
+ METRIC_RECORD_HOURLY,
hBaseAccessor, metricsConf,
checkpointLocation,
sleepIntervalMillis,
@@ -159,12 +171,13 @@ public class TimelineMetricAggregatorFactory {
hostAggregatorDisabledParam,
inputTableName,
outputTableName,
- 3600000l
+ 3600000l,
+ haController
);
}
return new TimelineMetricHostAggregator(
- "TimelineMetricHostAggregatorHourly",
+ METRIC_RECORD_HOURLY,
hBaseAccessor, metricsConf,
checkpointLocation,
sleepIntervalMillis,
@@ -172,7 +185,8 @@ public class TimelineMetricAggregatorFactory {
hostAggregatorDisabledParam,
inputTableName,
outputTableName,
- 3600000l);
+ 3600000l,
+ haController);
}
/**
@@ -180,7 +194,8 @@ public class TimelineMetricAggregatorFactory {
* Interval : 1 day
*/
public static TimelineMetricAggregator createTimelineMetricAggregatorDaily
- (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+ (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+ TimelineMetricHAController haController) {
String checkpointDir = metricsConf.get(
TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -198,7 +213,7 @@ public class TimelineMetricAggregatorFactory {
if (useGroupByAggregator(metricsConf)) {
return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricHostAggregator(
- "TimelineMetricHostAggregatorDaily",
+ METRIC_RECORD_DAILY,
hBaseAccessor, metricsConf,
checkpointLocation,
sleepIntervalMillis,
@@ -206,12 +221,13 @@ public class TimelineMetricAggregatorFactory {
hostAggregatorDisabledParam,
inputTableName,
outputTableName,
- 3600000l
+ 3600000l,
+ haController
);
}
return new TimelineMetricHostAggregator(
- "TimelineMetricHostAggregatorDaily",
+ METRIC_RECORD_DAILY,
hBaseAccessor, metricsConf,
checkpointLocation,
sleepIntervalMillis,
@@ -219,7 +235,8 @@ public class TimelineMetricAggregatorFactory {
hostAggregatorDisabledParam,
inputTableName,
outputTableName,
- 3600000l);
+ 3600000l,
+ haController);
}
/**
@@ -229,7 +246,8 @@ public class TimelineMetricAggregatorFactory {
*/
public static TimelineMetricAggregator createTimelineClusterAggregatorSecond(
PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
- TimelineMetricMetadataManager metadataManager) {
+ TimelineMetricMetadataManager metadataManager,
+ TimelineMetricHAController haController) {
String checkpointDir = metricsConf.get(
TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -252,7 +270,7 @@ public class TimelineMetricAggregatorFactory {
// Second based aggregation have added responsibility of time slicing
return new TimelineMetricClusterAggregatorSecond(
- "TimelineClusterAggregatorSecond",
+ METRIC_AGGREGATE_SECOND,
metadataManager,
hBaseAccessor, metricsConf,
checkpointLocation,
@@ -262,7 +280,8 @@ public class TimelineMetricAggregatorFactory {
inputTableName,
outputTableName,
120000l,
- timeSliceIntervalMillis
+ timeSliceIntervalMillis,
+ haController
);
}
@@ -271,7 +290,8 @@ public class TimelineMetricAggregatorFactory {
* Interval : 5 mins
*/
public static TimelineMetricAggregator createTimelineClusterAggregatorMinute(
- PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+ PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+ TimelineMetricHAController haController) {
String checkpointDir = metricsConf.get(
TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -291,7 +311,7 @@ public class TimelineMetricAggregatorFactory {
if (useGroupByAggregator(metricsConf)) {
return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator(
- "TimelineClusterAggregatorMinute",
+ METRIC_AGGREGATE_MINUTE,
hBaseAccessor, metricsConf,
checkpointLocation,
sleepIntervalMillis,
@@ -299,12 +319,13 @@ public class TimelineMetricAggregatorFactory {
aggregatorDisabledParam,
inputTableName,
outputTableName,
- 120000l
+ 120000l,
+ haController
);
}
return new TimelineMetricClusterAggregator(
- "TimelineClusterAggregatorMinute",
+ METRIC_AGGREGATE_MINUTE,
hBaseAccessor, metricsConf,
checkpointLocation,
sleepIntervalMillis,
@@ -312,7 +333,8 @@ public class TimelineMetricAggregatorFactory {
aggregatorDisabledParam,
inputTableName,
outputTableName,
- 120000l
+ 120000l,
+ haController
);
}
@@ -321,7 +343,8 @@ public class TimelineMetricAggregatorFactory {
* Interval : 1 hour
*/
public static TimelineMetricAggregator createTimelineClusterAggregatorHourly(
- PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+ PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+ TimelineMetricHAController haController) {
String checkpointDir = metricsConf.get(
TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -341,7 +364,7 @@ public class TimelineMetricAggregatorFactory {
if (useGroupByAggregator(metricsConf)) {
return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator(
- "TimelineClusterAggregatorHourly",
+ METRIC_AGGREGATE_HOURLY,
hBaseAccessor, metricsConf,
checkpointLocation,
sleepIntervalMillis,
@@ -349,12 +372,13 @@ public class TimelineMetricAggregatorFactory {
aggregatorDisabledParam,
inputTableName,
outputTableName,
- 120000l
+ 120000l,
+ haController
);
}
return new TimelineMetricClusterAggregator(
- "TimelineClusterAggregatorHourly",
+ METRIC_AGGREGATE_HOURLY,
hBaseAccessor, metricsConf,
checkpointLocation,
sleepIntervalMillis,
@@ -362,7 +386,8 @@ public class TimelineMetricAggregatorFactory {
aggregatorDisabledParam,
inputTableName,
outputTableName,
- 120000l
+ 120000l,
+ haController
);
}
@@ -371,7 +396,8 @@ public class TimelineMetricAggregatorFactory {
* Interval : 1 day
*/
public static TimelineMetricAggregator createTimelineClusterAggregatorDaily(
- PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+ PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+ TimelineMetricHAController haController) {
String checkpointDir = metricsConf.get(
TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -391,7 +417,7 @@ public class TimelineMetricAggregatorFactory {
if (useGroupByAggregator(metricsConf)) {
return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator(
- "TimelineClusterAggregatorDaily",
+ METRIC_AGGREGATE_DAILY,
hBaseAccessor, metricsConf,
checkpointLocation,
sleepIntervalMillis,
@@ -399,12 +425,13 @@ public class TimelineMetricAggregatorFactory {
aggregatorDisabledParam,
inputTableName,
outputTableName,
- 120000l
+ 120000l,
+ haController
);
}
return new TimelineMetricClusterAggregator(
- "TimelineClusterAggregatorDaily",
+ METRIC_AGGREGATE_DAILY,
hBaseAccessor, metricsConf,
checkpointLocation,
sleepIntervalMillis,
@@ -412,7 +439,8 @@ public class TimelineMetricAggregatorFactory {
aggregatorDisabledParam,
inputTableName,
outputTableName,
- 120000l
+ 120000l,
+ haController
);
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
index f90b01f..6438256 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
@@ -36,7 +38,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
private final TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(true);
private final boolean isClusterPrecisionInputTable;
- public TimelineMetricClusterAggregator(String aggregatorName,
+ public TimelineMetricClusterAggregator(AGGREGATOR_NAME aggregatorName,
PhoenixHBaseAccessor hBaseAccessor,
Configuration metricsConf,
String checkpointLocation,
@@ -45,11 +47,12 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
String hostAggregatorDisabledParam,
String inputTableName,
String outputTableName,
- Long nativeTimeRangeDelay) {
+ Long nativeTimeRangeDelay,
+ TimelineMetricHAController haController) {
super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
sleepIntervalMillis, checkpointCutOffMultiplier,
hostAggregatorDisabledParam, inputTableName, outputTableName,
- nativeTimeRangeDelay);
+ nativeTimeRangeDelay, haController);
isClusterPrecisionInputTable = inputTableName.equals(METRICS_CLUSTER_AGGREGATE_TABLE_NAME);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
index e0e065b..9c11d39 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
@@ -22,10 +22,12 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.sink.timeline.PostProcessingUtil;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
+
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -39,7 +41,6 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
/**
* Aggregates a metric across all hosts in the cluster. Reads metrics from
@@ -54,8 +55,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
private final Long serverTimeShiftAdjustment;
private final boolean interpolationEnabled;
-
- public TimelineMetricClusterAggregatorSecond(String aggregatorName,
+ public TimelineMetricClusterAggregatorSecond(AGGREGATOR_NAME aggregatorName,
TimelineMetricMetadataManager metadataManager,
PhoenixHBaseAccessor hBaseAccessor,
Configuration metricsConf,
@@ -66,10 +66,11 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
String tableName,
String outputTableName,
Long nativeTimeRangeDelay,
- Long timeSliceInterval) {
+ Long timeSliceInterval,
+ TimelineMetricHAController haController) {
super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
sleepIntervalMillis, checkpointCutOffMultiplier, aggregatorDisabledParam,
- tableName, outputTableName, nativeTimeRangeDelay);
+ tableName, outputTableName, nativeTimeRangeDelay, haController);
appAggregator = new TimelineMetricAppAggregator(metadataManager, metricsConf);
this.timeSliceIntervalMillis = timeSliceInterval;
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
index 26e73b0..364a4b5 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
@@ -22,20 +22,24 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
+
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
+
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
private static final Log LOG = LogFactory.getLog(TimelineMetricHostAggregator.class);
TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
- public TimelineMetricHostAggregator(String aggregatorName,
+ public TimelineMetricHostAggregator(AGGREGATOR_NAME aggregatorName,
PhoenixHBaseAccessor hBaseAccessor,
Configuration metricsConf,
String checkpointLocation,
@@ -44,10 +48,11 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
String hostAggregatorDisabledParam,
String tableName,
String outputTableName,
- Long nativeTimeRangeDelay) {
+ Long nativeTimeRangeDelay,
+ TimelineMetricHAController haController) {
super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
sleepIntervalMillis, checkpointCutOffMultiplier, hostAggregatorDisabledParam,
- tableName, outputTableName, nativeTimeRangeDelay);
+ tableName, outputTableName, nativeTimeRangeDelay, haController);
}
@Override
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
index c056d79..aeddf06 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
@@ -20,19 +20,23 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.EmptyCondition;
+
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Date;
+
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_AGGREGATED_APP_METRIC_GROUPBY_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator {
private final String aggregateColumnName;
- public TimelineMetricClusterAggregator(String aggregatorName,
+ public TimelineMetricClusterAggregator(AGGREGATOR_NAME aggregatorName,
PhoenixHBaseAccessor hBaseAccessor,
Configuration metricsConf,
String checkpointLocation,
@@ -41,11 +45,12 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
String hostAggregatorDisabledParam,
String inputTableName,
String outputTableName,
- Long nativeTimeRangeDelay) {
+ Long nativeTimeRangeDelay,
+ TimelineMetricHAController haController) {
super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
sleepIntervalMillis, checkpointCutOffMultiplier,
hostAggregatorDisabledParam, inputTableName, outputTableName,
- nativeTimeRangeDelay);
+ nativeTimeRangeDelay, haController);
if (inputTableName.equals(METRICS_CLUSTER_AGGREGATE_TABLE_NAME)) {
aggregateColumnName = "HOSTS_COUNT";
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
index 118c695..0df8329 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
@@ -20,8 +20,11 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.EmptyCondition;
+
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -31,7 +34,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
- public TimelineMetricHostAggregator(String aggregatorName,
+ public TimelineMetricHostAggregator(AGGREGATOR_NAME aggregatorName,
PhoenixHBaseAccessor hBaseAccessor,
Configuration metricsConf,
String checkpointLocation,
@@ -40,10 +43,11 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
String hostAggregatorDisabledParam,
String tableName,
String outputTableName,
- Long nativeTimeRangeDelay) {
+ Long nativeTimeRangeDelay,
+ TimelineMetricHAController haController) {
super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
sleepIntervalMillis, checkpointCutOffMultiplier, hostAggregatorDisabledParam,
- tableName, outputTableName, nativeTimeRangeDelay);
+ tableName, outputTableName, nativeTimeRangeDelay, haController);
}
@Override
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java
new file mode 100644
index 0000000..4a1f17b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator.AGGREGATOR_TYPE;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.participant.StateMachineEngine;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator.AGGREGATOR_TYPE.CLUSTER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator.AGGREGATOR_TYPE.HOST;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_DAILY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_HOURLY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_MINUTE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_DAILY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_HOURLY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_MINUTE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.CLUSTER_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.METRIC_AGGREGATORS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.STATE_MODEL_NAME;
+
+public class AggregationTaskRunner {
+ private final String instanceName;
+ private final String zkAddress;
+ private HelixManager manager;
+ private static final Log LOG = LogFactory.getLog(AggregationTaskRunner.class);
+ private CheckpointManager checkpointManager;
+ // Map partition name to an aggregator dimension
+ static final Map<String, AGGREGATOR_TYPE> PARTITION_AGGREGATION_TYPES = new HashMap<>();
+ // Ownership flags to be set by the State transitions
+ private final AtomicBoolean performsClusterAggregation = new AtomicBoolean(false);
+ private final AtomicBoolean performsHostAggregation = new AtomicBoolean(false);
+
+ public enum AGGREGATOR_NAME {
+ METRIC_RECORD_MINUTE,
+ METRIC_RECORD_HOURLY,
+ METRIC_RECORD_DAILY,
+ METRIC_AGGREGATE_SECOND,
+ METRIC_AGGREGATE_MINUTE,
+ METRIC_AGGREGATE_HOURLY,
+ METRIC_AGGREGATE_DAILY,
+ }
+
+ public static final Map<AGGREGATOR_NAME, String> ACTUAL_AGGREGATOR_NAMES = new HashMap<>();
+
+ static {
+ ACTUAL_AGGREGATOR_NAMES.put(METRIC_RECORD_MINUTE, "TimelineMetricHostAggregatorMinute");
+ ACTUAL_AGGREGATOR_NAMES.put(METRIC_RECORD_HOURLY, "TimelineMetricHostAggregatorHourly");
+ ACTUAL_AGGREGATOR_NAMES.put(METRIC_RECORD_DAILY, "TimelineMetricHostAggregatorDaily");
+ ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_SECOND, "TimelineClusterAggregatorSecond");
+ ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_MINUTE, "TimelineClusterAggregatorMinute");
+ ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_HOURLY, "TimelineClusterAggregatorHourly");
+ ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_DAILY, "TimelineClusterAggregatorDaily");
+
+ // Partition name to task assignment
+ PARTITION_AGGREGATION_TYPES.put(METRIC_AGGREGATORS + "_0", CLUSTER);
+ PARTITION_AGGREGATION_TYPES.put(METRIC_AGGREGATORS + "_1", HOST);
+ }
+
+ public AggregationTaskRunner(String instanceName, String zkAddress) {
+ this.instanceName = instanceName;
+ this.zkAddress = zkAddress;
+ }
+
+ public void initialize() throws Exception {
+ manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, instanceName,
+ InstanceType.PARTICIPANT, zkAddress);
+
+ OnlineOfflineStateModelFactory stateModelFactory =
+ new OnlineOfflineStateModelFactory(instanceName, this);
+
+ StateMachineEngine stateMach = manager.getStateMachineEngine();
+ stateMach.registerStateModelFactory(STATE_MODEL_NAME, stateModelFactory);
+ manager.connect();
+
+ checkpointManager = new CheckpointManager(manager.getHelixPropertyStore());
+ }
+
+ public boolean performsClusterAggregation() {
+ return performsClusterAggregation.get();
+ }
+
+ public boolean performsHostAggregation() {
+ return performsHostAggregation.get();
+ }
+
+ public CheckpointManager getCheckpointManager() {
+ return checkpointManager;
+ }
+
+ public void setPartitionAggregationFunction(AGGREGATOR_TYPE type) {
+ switch (type) {
+ case HOST:
+ performsHostAggregation.set(true);
+ LOG.info("Set host aggregator function for : " + instanceName);
+ break;
+ case CLUSTER:
+ performsClusterAggregation.set(true);
+ LOG.info("Set cluster aggregator function for : " + instanceName);
+ }
+ }
+
+ public void unsetPartitionAggregationFunction(AGGREGATOR_TYPE type) {
+ switch (type) {
+ case HOST:
+ performsHostAggregation.set(false);
+ LOG.info("Unset host aggregator function for : " + instanceName);
+ break;
+ case CLUSTER:
+ performsClusterAggregation.set(false);
+ LOG.info("Unset cluster aggregator function for : " + instanceName);
+ }
+ }
+
+ /**
+ * Disconnect participant before controller shutdown
+ */
+ void stop() {
+ manager.disconnect();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/CheckpointManager.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/CheckpointManager.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/CheckpointManager.java
new file mode 100644
index 0000000..439102f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/CheckpointManager.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+import org.apache.helix.AccessOption;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.zookeeper.data.Stat;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
+
+public class CheckpointManager {
+ private final ZkHelixPropertyStore<ZNRecord> propertyStore;
+ private static final Log LOG = LogFactory.getLog(CheckpointManager.class);
+
+ static final String ZNODE_FIELD = "checkpoint";
+ static final String CHECKPOINT_PATH_PREFIX = "CHECKPOINTS";
+
+ public CheckpointManager(ZkHelixPropertyStore<ZNRecord> propertyStore) {
+ this.propertyStore = propertyStore;
+ }
+
+ /**
+ * Read aggregator checkpoint from zookeeper
+ *
+ * @return timestamp
+ */
+ public long readCheckpoint(AGGREGATOR_NAME aggregatorName) {
+ String path = getCheckpointZKPath(aggregatorName);
+ LOG.debug("Reading checkpoint at " + path);
+ Stat stat = new Stat();
+ ZNRecord znRecord = propertyStore.get(path, stat, AccessOption.PERSISTENT);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Stat => " + stat);
+ }
+ long checkpoint = znRecord != null ? znRecord.getLongField(ZNODE_FIELD, -1) : -1;
+ LOG.debug("Checkpoint value = " + checkpoint);
+ return checkpoint;
+ }
+
+ /**
+ * Write aggregator checkpoint in zookeeper
+ *
+ * @param value timestamp
+ * @return sucsess
+ */
+ public boolean writeCheckpoint(AGGREGATOR_NAME aggregatorName, long value) {
+ String path = getCheckpointZKPath(aggregatorName);
+ LOG.debug(String.format("Saving checkpoint at %s with value %s", path, value));
+ return propertyStore.update(path, new CheckpointDataUpdater(path, value), AccessOption.PERSISTENT);
+ }
+
+ static class CheckpointDataUpdater implements DataUpdater<ZNRecord> {
+ final String path;
+ final Long value;
+
+ public CheckpointDataUpdater(String path, Long value) {
+ this.path = path;
+ this.value = value;
+ }
+
+ @Override
+ public ZNRecord update(ZNRecord currentData) {
+ if (currentData == null) {
+ currentData = new ZNRecord(path);
+ }
+ currentData.setLongField(ZNODE_FIELD, value);
+ return currentData;
+ }
+ }
+
+ String getCheckpointZKPath(AGGREGATOR_NAME aggregatorName) {
+ StringBuilder sb = new StringBuilder("/");
+ sb.append(CHECKPOINT_PATH_PREFIX);
+ sb.append("/");
+ sb.append(ACTUAL_AGGREGATOR_NAMES.get(aggregatorName));
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/OnlineOfflineStateModelFactory.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/OnlineOfflineStateModelFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/OnlineOfflineStateModelFactory.java
new file mode 100644
index 0000000..7d3350b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/OnlineOfflineStateModelFactory.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator.AGGREGATOR_TYPE;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.PARTITION_AGGREGATION_TYPES;
+
+public class OnlineOfflineStateModelFactory extends StateModelFactory<StateModel> {
+ private static final Log LOG = LogFactory.getLog(OnlineOfflineStateModelFactory.class);
+ private final String instanceName;
+ private final AggregationTaskRunner taskRunner;
+
+ public OnlineOfflineStateModelFactory(String instanceName, AggregationTaskRunner taskRunner) {
+ this.instanceName = instanceName;
+ this.taskRunner = taskRunner;
+ }
+
+ @Override
+ public StateModel createNewStateModel(String resourceName, String partition) {
+ LOG.info("Received request to process partition = " + partition + ", for " +
+ "resource = " + resourceName + ", at " + instanceName);
+ return new OnlineOfflineStateModel();
+ }
+
+ public class OnlineOfflineStateModel extends StateModel {
+ public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
+ String partitionName = message.getPartitionName();
+ LOG.info("Received transition to Online from Offline for partition: " + partitionName);
+ AGGREGATOR_TYPE type = PARTITION_AGGREGATION_TYPES.get(partitionName);
+ taskRunner.setPartitionAggregationFunction(type);
+ }
+
+ public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
+ String partitionName = message.getPartitionName();
+ LOG.info("Received transition to Offline from Online for partition: " + partitionName);
+ AGGREGATOR_TYPE type = PARTITION_AGGREGATION_TYPES.get(partitionName);
+ taskRunner.unsetPartitionAggregationFunction(type);
+ }
+
+ public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
+ String partitionName = message.getPartitionName();
+ LOG.info("Received transition to Dropped from Offline for partition: " + partitionName);
+ AGGREGATOR_TYPE type = PARTITION_AGGREGATION_TYPES.get(partitionName);
+ taskRunner.unsetPartitionAggregationFunction(type);
+ }
+ }
+}
\ No newline at end of file