You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2016/03/30 20:35:27 UTC
[1/2] ambari git commit: AMBARI-15623. Support distributed
aggregation for multiple AMS instances. (swagle)
Repository: ambari
Updated Branches:
refs/heads/trunk c479fdd96 -> dfa4454e7
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAController.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAController.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAController.java
new file mode 100644
index 0000000..53b9e7e
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAController.java
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability;
+
+import com.google.common.base.Joiner;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricsSystemInitializationException;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.controller.GenericHelixController;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.OnlineOfflineSMD;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.helix.model.IdealState.RebalanceMode.FULL_AUTO;
+
+public class TimelineMetricHAController {
+ private static final Log LOG = LogFactory.getLog(TimelineMetricHAController.class);
+
+ static final String CLUSTER_NAME = "ambari-metrics-cluster";
+ static final String METRIC_AGGREGATORS = "METRIC_AGGREGATORS";
+ static final String STATE_MODEL_NAME = OnlineOfflineSMD.name;
+ static final String INSTANCE_NAME_DELIMITER = "_";
+
+ final String zkConnectUrl;
+ final String instanceHostname;
+ final InstanceConfig instanceConfig;
+ final AggregationTaskRunner aggregationTaskRunner;
+
+ // Cache list of known live instances
+ final List<String> liveInstanceNames = new ArrayList<>();
+
+ // Helix Admin
+ HelixAdmin admin;
+ // Helix Manager
+ HelixManager manager;
+
+ private volatile boolean isInitialized = false;
+
+ public TimelineMetricHAController(TimelineMetricConfiguration configuration) {
+ String instancePort;
+ try {
+ instanceHostname = configuration.getInstanceHostnameFromEnv();
+ instancePort = configuration.getInstancePort();
+
+ } catch (Exception e) {
+ LOG.error("Error reading configs from classpath, will resort to defaults.", e);
+ throw new MetricsSystemInitializationException(e.getMessage());
+ }
+
+ try {
+ String zkClientPort = configuration.getZKClientPort();
+ String zkQuorum = configuration.getZKQuorum();
+
+ if (StringUtils.isEmpty(zkClientPort) || StringUtils.isEmpty(zkQuorum)) {
+ throw new Exception("Unable to parse zookeeper quorum. clientPort = "
+ + zkClientPort +", quorum = " + zkQuorum);
+ }
+
+ zkConnectUrl = getZkConnectionUrl(zkClientPort, zkQuorum);
+
+ } catch (Exception e) {
+ LOG.error("Unable to load hbase-site from classpath.", e);
+ throw new MetricsSystemInitializationException(e.getMessage());
+ }
+
+ instanceConfig = new InstanceConfig(instanceHostname + INSTANCE_NAME_DELIMITER + instancePort);
+ instanceConfig.setHostName(instanceHostname);
+ instanceConfig.setPort(instancePort);
+ instanceConfig.setInstanceEnabled(true);
+ aggregationTaskRunner = new AggregationTaskRunner(instanceConfig.getInstanceName(), zkConnectUrl);
+ }
+
+ /**
+ * Initialize the instance with zookeeper via Helix
+ */
+ public void initializeHAController() throws Exception {
+ admin = new ZKHelixAdmin(zkConnectUrl);
+ // create cluster
+ LOG.info("Creating zookeeper cluster node: " + CLUSTER_NAME);
+ admin.addCluster(CLUSTER_NAME, false);
+
+ // Adding host to the cluster
+ List<String> nodes = admin.getInstancesInCluster(CLUSTER_NAME);
+ if (nodes == null || !nodes.contains(instanceConfig.getInstanceName())) {
+ LOG.info("Adding participant instance " + instanceConfig);
+ admin.addInstance(CLUSTER_NAME, instanceConfig);
+ }
+
+ // Add a state model
+ if (admin.getStateModelDef(CLUSTER_NAME, STATE_MODEL_NAME) == null) {
+ LOG.info("Adding ONLINE-OFFLINE state model to the cluster");
+ admin.addStateModelDef(CLUSTER_NAME, STATE_MODEL_NAME, OnlineOfflineSMD.build());
+ }
+
+ // Add resources with 1 cluster-wide replica
+ // Since our aggregators are unbalanced in terms of work distribution we
+ // only need to distribute writes to METRIC_AGGREGATE and
+ // METRIC_RECORD_MINUTE
+ List<String> resources = admin.getResourcesInCluster(CLUSTER_NAME);
+ if (!resources.contains(METRIC_AGGREGATORS)) {
+ LOG.info("Adding resource " + METRIC_AGGREGATORS + " with 2 partitions and 1 replicas");
+ admin.addResource(CLUSTER_NAME, METRIC_AGGREGATORS, 2, OnlineOfflineSMD.name, FULL_AUTO.toString());
+ }
+ // this will set up the ideal state, it calculates the preference list for
+ // each partition similar to consistent hashing
+ admin.rebalance(CLUSTER_NAME, METRIC_AGGREGATORS, 1);
+
+ // Start participant
+ startAggregators();
+
+ // Start controller
+ startController();
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ aggregationTaskRunner.stop();
+ manager.disconnect();
+ }
+ });
+
+ isInitialized = true;
+ }
+
+ /**
+ * Return true if HA controller is enabled.
+ */
+ public boolean isInitialized() {
+ return isInitialized;
+ }
+
+ private void startAggregators() {
+ try {
+ aggregationTaskRunner.initialize();
+
+ } catch (Exception e) {
+ LOG.error("Unable to start aggregators.", e);
+ throw new MetricsSystemInitializationException(e.getMessage());
+ }
+ }
+
+ private void startController() throws Exception {
+ manager = HelixManagerFactory.getZKHelixManager(
+ CLUSTER_NAME,
+ instanceHostname,
+ InstanceType.CONTROLLER,
+ zkConnectUrl
+ );
+
+ manager.connect();
+ HelixController controller = new HelixController();
+ manager.addLiveInstanceChangeListener(controller);
+ }
+
+ private String getZkConnectionUrl(String zkClientPort, String zkQuorum) {
+ StringBuilder sb = new StringBuilder();
+ String[] quorumParts = zkQuorum.split(",");
+ String prefix = "";
+ for (String part : quorumParts) {
+ sb.append(prefix);
+ sb.append(part.trim());
+ if (!part.contains(":")) {
+ sb.append(":");
+ sb.append(zkClientPort);
+ }
+ prefix = ",";
+ }
+
+ return sb.toString();
+ }
+
+ public AggregationTaskRunner getAggregationTaskRunner() {
+ return aggregationTaskRunner;
+ }
+
+ public List<String> getLiveInstanceHostNames() {
+ List<String> liveInstanceHostNames = new ArrayList<>();
+
+ for (String instance : liveInstanceNames) {
+ liveInstanceHostNames.add(instance.split(INSTANCE_NAME_DELIMITER)[0]);
+ }
+
+ return liveInstanceHostNames;
+ }
+
+ public class HelixController extends GenericHelixController {
+ ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+ Joiner joiner = Joiner.on(", ").skipNulls();
+
+ @Override
+ public void onLiveInstanceChange(List<LiveInstance> liveInstances, NotificationContext changeContext) {
+ super.onLiveInstanceChange(liveInstances, changeContext);
+
+ liveInstanceNames.clear();
+ for (LiveInstance instance : liveInstances) {
+ liveInstanceNames.add(instance.getInstanceName());
+ }
+
+ LOG.info("Detected change in liveliness of Collector instances. " +
+ "LiveIsntances = " + joiner.join(liveInstanceNames));
+ // Print HA state - after some delay
+ executorService.schedule(new Runnable() {
+ @Override
+ public void run() {
+ printClusterState();
+ }
+ }, 30, TimeUnit.SECONDS);
+
+
+ }
+ }
+
+ public void printClusterState() {
+ StringBuilder sb = new StringBuilder("\n######################### Cluster HA state ########################");
+
+ ExternalView resourceExternalView = admin.getResourceExternalView(CLUSTER_NAME, METRIC_AGGREGATORS);
+ if (resourceExternalView != null) {
+ getPrintableResourceState(resourceExternalView, METRIC_AGGREGATORS, sb);
+ }
+ sb.append("\n##################################################");
+ LOG.info(sb.toString());
+ }
+
+ private void getPrintableResourceState(ExternalView resourceExternalView,
+ String resourceName,
+ StringBuilder sb) {
+ TreeSet<String> sortedSet = new TreeSet<>(resourceExternalView.getPartitionSet());
+ sb.append("\nCLUSTER: ");
+ sb.append(CLUSTER_NAME);
+ sb.append("\nRESOURCE: ");
+ sb.append(resourceName);
+ for (String partitionName : sortedSet) {
+ sb.append("\nPARTITION: ");
+ sb.append(partitionName).append("\t");
+ Map<String, String> states = resourceExternalView.getStateMap(partitionName);
+ for (Map.Entry<String, String> stateEntry : states.entrySet()) {
+ sb.append("\t");
+ sb.append(stateEntry.getKey());
+ sb.append("\t");
+ sb.append(stateEntry.getValue());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java
index 0ea136b..c5761f7 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java
@@ -32,7 +32,7 @@ import java.sql.SQLException;
public class DefaultPhoenixDataSource implements PhoenixConnectionProvider {
static final Log LOG = LogFactory.getLog(DefaultPhoenixDataSource.class);
- private static final String ZOOKEEPER_CLIENT_PORT ="hbase.zookeeper.property.clientPort";
+ private static final String ZOOKEEPER_CLIENT_PORT = "hbase.zookeeper.property.clientPort";
private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
private static final String ZNODE_PARENT = "zookeeper.znode.parent";
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
index 873671a..d2526a0 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
@@ -423,6 +423,18 @@ public class TimelineWebServices {
}
}
+ @GET
+ @Path("/metrics/livenodes")
+ @Produces({ MediaType.APPLICATION_JSON })
+ public List<String> getLiveCollectorNodes(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res
+ ) {
+ init(res);
+
+ return timelineMetricStore.getLiveInstances();
+ }
+
/**
* Store the given entities into the timeline store, and return the errors
* that happen during storing.
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
index 89f3fbe..f0030a1 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
@@ -128,7 +128,7 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
public void testGetMetricRecordsMinutes() throws IOException, SQLException {
// GIVEN
TimelineMetricAggregator aggregatorMinute =
- TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb, new Configuration());
+ TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb, new Configuration(), null);
long startTime = System.currentTimeMillis();
long ctime = startTime;
@@ -165,7 +165,7 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
public void testGetMetricRecordsHours() throws IOException, SQLException {
// GIVEN
TimelineMetricAggregator aggregator =
- TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hdb, new Configuration());
+ TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hdb, new Configuration(), null);
MetricHostAggregate expectedAggregate =
createMetricHostAggregate(2.0, 0.0, 20, 15.0);
@@ -217,7 +217,7 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
// GIVEN
TimelineMetricAggregator agg =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(
- hdb, new Configuration(), new TimelineMetricMetadataManager(hdb, new Configuration()));
+ hdb, new Configuration(), new TimelineMetricMetadataManager(hdb, new Configuration()), null);
long startTime = System.currentTimeMillis();
long ctime = startTime + 1;
@@ -256,8 +256,8 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
public void testGetClusterMetricRecordLatestWithFunction() throws Exception {
// GIVEN
TimelineMetricAggregator agg =
- TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond
- (hdb, new Configuration(), new TimelineMetricMetadataManager(hdb, new Configuration()));
+ TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
+ new Configuration(), new TimelineMetricMetadataManager(hdb, new Configuration()), null);
long startTime = System.currentTimeMillis();
long ctime = startTime + 1;
@@ -297,7 +297,7 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
public void testGetClusterMetricRecordsHours() throws Exception {
// GIVEN
TimelineMetricAggregator agg =
- TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, new Configuration());
+ TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, new Configuration(), null);
long startTime = System.currentTimeMillis();
long ctime = startTime;
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
index 8f8067b..61d6e71 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
@@ -94,4 +94,9 @@ public class TestTimelineMetricStore implements TimelineMetricStore {
public Map<String, Set<String>> getHostAppsMetadata() throws SQLException, IOException {
return Collections.emptyMap();
}
+
+ @Override
+ public List<String> getLiveInstances() {
+ return Collections.emptyList();
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java
index 827f399..ea947d0 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.HBaseTimelineMetricStore;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
import org.junit.Before;
import org.junit.Test;
@@ -26,6 +26,7 @@ import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.atomic.AtomicLong;
+
import static junit.framework.Assert.assertEquals;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.RESULTSET_FETCH_SIZE;
@@ -44,7 +45,7 @@ public class AbstractTimelineAggregatorTest {
@Before
public void setUp() throws Exception {
- sleepIntervalMillis = 5*60*1000l; //5 minutes
+ sleepIntervalMillis = 5 * 60 * 1000l; //5 minutes
checkpointCutOffMultiplier = 2;
Configuration metricsConf = new Configuration();
@@ -56,7 +57,7 @@ public class AbstractTimelineAggregatorTest {
checkPoint = new AtomicLong(-1);
actualRuns = 0;
- agg = new AbstractTimelineAggregator("TimelineAggregatorTest", null, metricsConf) {
+ agg = new AbstractTimelineAggregator(AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND, null, metricsConf) {
@Override
public boolean doWork(long startTime, long endTime) {
startTimeInDoWork.set(startTime);
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
index f201224..590f82a 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
@@ -77,7 +77,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
// GIVEN
TimelineMetricAggregator agg =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
- getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration()));
+ getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration()), null);
TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
long startTime = System.currentTimeMillis();
@@ -130,7 +130,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
// GIVEN
TimelineMetricAggregator agg =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
- getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration()));
+ getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration()), null);
TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
long startTime = System.currentTimeMillis();
@@ -206,7 +206,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
// GIVEN
TimelineMetricAggregator agg =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
- getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration()));
+ getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration()), null);
TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
// here we put some metrics tha will be aggregated
@@ -270,7 +270,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
public void testAggregateDailyClusterMetrics() throws Exception {
// GIVEN
TimelineMetricAggregator agg =
- TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hdb, getConfigurationForTest(false));
+ TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hdb, getConfigurationForTest(false), null);
// this time can be virtualized! or made independent from real clock
long startTime = System.currentTimeMillis();
@@ -315,7 +315,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
public void testShouldAggregateClusterOnMinuteProperly() throws Exception {
TimelineMetricAggregator agg =
- TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, getConfigurationForTest(false));
+ TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, getConfigurationForTest(false), null);
long startTime = System.currentTimeMillis();
long ctime = startTime;
@@ -382,7 +382,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
public void testShouldAggregateClusterOnHourProperly() throws Exception {
// GIVEN
TimelineMetricAggregator agg =
- TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, getConfigurationForTest(false));
+ TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, getConfigurationForTest(false), null);
// this time can be virtualized! or made independent from real clock
long startTime = System.currentTimeMillis();
@@ -426,7 +426,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
public void testShouldAggregateDifferentMetricsOnHourProperly() throws Exception {
// GIVEN
TimelineMetricAggregator agg =
- TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, getConfigurationForTest(false));
+ TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, getConfigurationForTest(false), null);
long startTime = System.currentTimeMillis();
long ctime = startTime;
@@ -490,7 +490,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
conf.set(CLUSTER_AGGREGATOR_APP_IDS, "app1");
TimelineMetricAggregator agg =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
- conf, new TimelineMetricMetadataManager(hdb, new Configuration()));
+ conf, new TimelineMetricMetadataManager(hdb, new Configuration()), null);
TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
long startTime = System.currentTimeMillis();
@@ -542,7 +542,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
public void testClusterAggregateMetricNormalization() throws Exception {
TimelineMetricAggregator agg =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
- getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration()));
+ getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration()), null);
TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
// Sample data
@@ -619,7 +619,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
public void testAggregationUsingGroupByQuery() throws Exception {
// GIVEN
TimelineMetricAggregator agg =
- TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, getConfigurationForTest(true));
+ TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, getConfigurationForTest(true), null);
long startTime = System.currentTimeMillis();
long ctime = startTime;
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java
index 9c7c8fa..9873643 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java
@@ -22,24 +22,13 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractMiniHBaseClusterTest;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
-import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
-import java.sql.Statement;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
@@ -95,7 +84,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
// GIVEN
TimelineMetricAggregator aggregatorMinute =
TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb,
- getConfigurationForTest(false));
+ getConfigurationForTest(false), null);
TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
long startTime = System.currentTimeMillis();
@@ -156,7 +145,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
// GIVEN
TimelineMetricAggregator aggregator =
TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hdb,
- getConfigurationForTest(false));
+ getConfigurationForTest(false), null);
TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
long startTime = System.currentTimeMillis();
@@ -219,7 +208,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
// GIVEN
TimelineMetricAggregator aggregator =
TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(hdb,
- getConfigurationForTest(false));
+ getConfigurationForTest(false), null);
TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
long startTime = System.currentTimeMillis();
@@ -281,7 +270,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
// GIVEN
TimelineMetricAggregator aggregatorMinute =
TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb,
- getConfigurationForTest(true));
+ getConfigurationForTest(true), null);
TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
long startTime = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
index d2d478c..f55dda1 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
@@ -24,11 +24,12 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
import org.easymock.EasyMock;
import org.junit.Test;
-import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND;
+
public class TimelineMetricClusterAggregatorSecondTest {
@Test
@@ -42,9 +43,9 @@ public class TimelineMetricClusterAggregatorSecondTest {
TimelineMetricMetadataManager metricMetadataManagerMock = EasyMock.createNiceMock(TimelineMetricMetadataManager.class);
TimelineMetricClusterAggregatorSecond secondAggregator = new TimelineMetricClusterAggregatorSecond(
- "TimelineClusterAggregatorSecond", metricMetadataManagerMock, null, configuration, null,
- aggregatorInterval, 2, "false", "", "", aggregatorInterval, sliceInterval
- );
+ METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null,
+ configuration, null, aggregatorInterval, 2, "false", "", "",
+ aggregatorInterval, sliceInterval, null);
secondAggregator.timeSliceIntervalMillis = sliceInterval;
long roundedEndTime = AbstractTimelineAggregator.getRoundedAggregateTimeMillis(aggregatorInterval);
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAControllerTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAControllerTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAControllerTest.java
new file mode 100644
index 0000000..04e8909
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAControllerTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability;
+
+import junit.framework.Assert;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractMiniHBaseClusterTest;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.CLUSTER_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.METRIC_AGGREGATORS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.STATE_MODEL_NAME;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+
+public class TimelineMetricHAControllerTest extends AbstractMiniHBaseClusterTest {
+ TimelineMetricConfiguration configuration;
+
+ @Before
+ public void setup() throws Exception {
+ configuration = createNiceMock(TimelineMetricConfiguration.class);
+
+ expect(configuration.getInstanceHostnameFromEnv()).andReturn("h1");
+ expect(configuration.getInstancePort()).andReturn("12000");
+ // jdbc:phoenix:localhost:52887:/hbase;test=true
+ String zkUrl = getUrl();
+ String port = zkUrl.split(":")[3];
+ String quorum = zkUrl.split(":")[2];
+
+ expect(configuration.getZKClientPort()).andReturn(port);
+ expect(configuration.getZKQuorum()).andReturn(quorum);
+
+ replay(configuration);
+ }
+
+ @Test(timeout = 150000)
+ public void testHAControllerDistributedAggregation() throws Exception {
+ TimelineMetricHAController haController = new TimelineMetricHAController(configuration);
+ haController.initializeHAController();
+ // Wait for task assignment
+ Thread.sleep(10000);
+
+ Assert.assertTrue(haController.isInitialized());
+ Assert.assertEquals(1, haController.getLiveInstanceHostNames().size());
+ Assert.assertTrue(haController.getAggregationTaskRunner().performsClusterAggregation());
+ Assert.assertTrue(haController.getAggregationTaskRunner().performsHostAggregation());
+
+ // Add new instance
+ InstanceConfig instanceConfig2 = new InstanceConfig("h2_12001");
+ haController.admin.addInstance(CLUSTER_NAME, instanceConfig2);
+ HelixManager manager2 = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME,
+ instanceConfig2.getInstanceName(),
+ InstanceType.PARTICIPANT, haController.zkConnectUrl);
+ manager2.getStateMachineEngine().registerStateModelFactory(STATE_MODEL_NAME,
+ new OnlineOfflineStateModelFactory(instanceConfig2.getInstanceName(),
+ new AggregationTaskRunner(instanceConfig2.getInstanceName(), "")));
+ manager2.connect();
+ haController.admin.rebalance(CLUSTER_NAME, METRIC_AGGREGATORS, 1);
+
+ // Wait on re-assignment of partitions
+ Thread.sleep(10000);
+ Assert.assertEquals(2, haController.getLiveInstanceHostNames().size());
+
+ ExternalView view = haController.admin.getResourceExternalView(CLUSTER_NAME, METRIC_AGGREGATORS);
+
+ Map<String, String> partitionInstanceMap = new HashMap<>();
+
+ for (String partition : view.getPartitionSet()) {
+ Map<String, String> states = view.getStateMap(partition);
+ // (instance, state) pairs
+ for (Map.Entry<String, String> stateEntry : states.entrySet()) {
+ partitionInstanceMap.put(partition, stateEntry.getKey());
+ Assert.assertEquals("ONLINE", stateEntry.getValue());
+ }
+ }
+ // Re-assigned partitions
+ Assert.assertEquals(2, partitionInstanceMap.size());
+
+ haController.getAggregationTaskRunner().stop();
+ haController.manager.disconnect();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java
index eb7d750..d8b84ec 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java
@@ -23,6 +23,7 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -42,6 +43,7 @@ import org.apache.ambari.server.orm.entities.PermissionEntity;
import org.apache.ambari.server.orm.entities.RoleAuthorizationEntity;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Config;
import org.apache.ambari.server.state.RepositoryType;
import org.apache.ambari.server.state.State;
import org.slf4j.Logger;
@@ -150,7 +152,7 @@ public class UpgradeCatalog240 extends AbstractUpgradeCatalog {
dbAccessor.addColumn(CLUSTER_TABLE, new DBColumnInfo(CLUSTER_UPGRADE_ID_COLUMN, Long.class, null, null, true));
dbAccessor.addFKConstraint(CLUSTER_TABLE, "FK_clusters_upgrade_id",
- CLUSTER_UPGRADE_ID_COLUMN, UPGRADE_TABLE, "upgrade_id", false);
+ CLUSTER_UPGRADE_ID_COLUMN, UPGRADE_TABLE, "upgrade_id", false);
}
@Override
@@ -165,6 +167,7 @@ public class UpgradeCatalog240 extends AbstractUpgradeCatalog {
setRoleSortOrder();
addSettingPermission();
addManageUserPersistedDataPermission();
+ updateAMSConfigs();
}
private void createSettingTable() throws SQLException {
@@ -485,7 +488,7 @@ public class UpgradeCatalog240 extends AbstractUpgradeCatalog {
dbAccessor.executeUpdate(String.format(updateStatement,
6, PermissionEntity.CLUSTER_USER_PERMISSION_NAME));
dbAccessor.executeUpdate(String.format(updateStatement,
- 7, PermissionEntity.VIEW_USER_PERMISSION_NAME));
+ 7, PermissionEntity.VIEW_USER_PERMISSION_NAME));
}
/**
@@ -671,4 +674,30 @@ public class UpgradeCatalog240 extends AbstractUpgradeCatalog {
dbAccessor.executeQuery("UPDATE " + HOST_ROLE_COMMAND_TABLE + " SET original_start_time=-1 WHERE original_start_time IS NULL");
dbAccessor.setColumnNullable(HOST_ROLE_COMMAND_TABLE, columnName, false);
}
+
+ protected void updateAMSConfigs() throws AmbariException {
+ AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class);
+ Clusters clusters = ambariManagementController.getClusters();
+
+ if (clusters != null) {
+ Map<String, Cluster> clusterMap = clusters.getClusters();
+
+ if (clusterMap != null && !clusterMap.isEmpty()) {
+ for (final Cluster cluster : clusterMap.values()) {
+
+ Config amsEnv = cluster.getDesiredConfigByType("ams-env");
+
+ if (amsEnv != null) {
+ String content = amsEnv.getProperties().get("content");
+ if (content != null && !content.contains("AMS_INSTANCE_NAME")) {
+ String newContent = content + "\n # AMS instance name\n" +
+ "export AMS_INSTANCE_NAME={{hostname}}\n";
+
+ updateConfigurationProperties("ams-env", Collections.singletonMap("content", newContent), true, true);
+ }
+ }
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml
index 836e159..c027939 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml
@@ -80,6 +80,9 @@
<value>
# Set environment variables here.
+# AMS instance name
+export AMS_INSTANCE_NAME={{hostname}}
+
# The java implementation to use. Java 1.6 required.
export JAVA_HOME={{java64_home}}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml
index 59dbd84..f8131c0 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml
@@ -29,7 +29,7 @@
<name>METRICS_COLLECTOR</name>
<displayName>Metrics Collector</displayName>
<category>MASTER</category>
- <cardinality>1</cardinality>
+ <cardinality>1+</cardinality>
<versionAdvertised>false</versionAdvertised>
<timelineAppid>AMS-HBASE</timelineAppid>
<recovery_enabled>true</recovery_enabled>
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java
index ef2a96e..5cd3bdc 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java
@@ -282,6 +282,7 @@ public class UpgradeCatalog240Test {
Method updateAlerts = UpgradeCatalog240.class.getDeclaredMethod("updateAlerts");
Method addManageUserPersistedDataPermission = UpgradeCatalog240.class.getDeclaredMethod("addManageUserPersistedDataPermission");
Method addSettingPermission = UpgradeCatalog240.class.getDeclaredMethod("addSettingPermission");
+ Method updateAmsConfigs = UpgradeCatalog240.class.getDeclaredMethod("updateAMSConfigs");
Capture<String> capturedStatements = newCapture(CaptureType.ALL);
@@ -293,6 +294,7 @@ public class UpgradeCatalog240Test {
.addMockedMethod(updateAlerts)
.addMockedMethod(addSettingPermission)
.addMockedMethod(addManageUserPersistedDataPermission)
+ .addMockedMethod(updateAmsConfigs)
.createMock();
Field field = AbstractUpgradeCatalog.class.getDeclaredField("dbAccessor");
@@ -302,6 +304,7 @@ public class UpgradeCatalog240Test {
upgradeCatalog240.updateAlerts();
upgradeCatalog240.addSettingPermission();
upgradeCatalog240.addManageUserPersistedDataPermission();
+ upgradeCatalog240.updateAMSConfigs();
replay(upgradeCatalog240, dbAccessor);
[2/2] ambari git commit: AMBARI-15623. Support distributed
aggregation for multiple AMS instances. (swagle)
Posted by sw...@apache.org.
AMBARI-15623. Support distributed aggregation for multiple AMS instances. (swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/dfa4454e
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/dfa4454e
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/dfa4454e
Branch: refs/heads/trunk
Commit: dfa4454e7d69fb160924a3877d3f3f1a6314c7bd
Parents: c479fdd
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Wed Mar 30 11:35:07 2016 -0700
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Wed Mar 30 11:35:07 2016 -0700
----------------------------------------------------------------------
.../ambari-metrics-timelineservice/pom.xml | 29 +-
.../timeline/HBaseTimelineMetricStore.java | 70 ++++-
.../timeline/TimelineMetricConfiguration.java | 46 ++++
.../metrics/timeline/TimelineMetricStore.java | 6 +
.../aggregators/AbstractTimelineAggregator.java | 106 +++++--
.../aggregators/TimelineMetricAggregator.java | 26 +-
.../TimelineMetricAggregatorFactory.java | 100 ++++---
.../TimelineMetricClusterAggregator.java | 9 +-
.../TimelineMetricClusterAggregatorSecond.java | 13 +-
.../TimelineMetricHostAggregator.java | 11 +-
.../v2/TimelineMetricClusterAggregator.java | 11 +-
.../v2/TimelineMetricHostAggregator.java | 10 +-
.../availability/AggregationTaskRunner.java | 144 ++++++++++
.../availability/CheckpointManager.java | 98 +++++++
.../OnlineOfflineStateModelFactory.java | 69 +++++
.../TimelineMetricHAController.java | 276 +++++++++++++++++++
.../query/DefaultPhoenixDataSource.java | 2 +-
.../webapp/TimelineWebServices.java | 12 +
.../timeline/ITPhoenixHBaseAccessor.java | 12 +-
.../timeline/TestTimelineMetricStore.java | 5 +
.../AbstractTimelineAggregatorTest.java | 7 +-
.../aggregators/ITClusterAggregator.java | 20 +-
.../aggregators/ITMetricAggregator.java | 19 +-
...melineMetricClusterAggregatorSecondTest.java | 9 +-
.../TimelineMetricHAControllerTest.java | 107 +++++++
.../server/upgrade/UpgradeCatalog240.java | 33 ++-
.../0.1.0/configuration/ams-env.xml | 3 +
.../AMBARI_METRICS/0.1.0/metainfo.xml | 2 +-
.../server/upgrade/UpgradeCatalog240Test.java | 3 +
29 files changed, 1112 insertions(+), 146 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/pom.xml b/ambari-metrics/ambari-metrics-timelineservice/pom.xml
index b435964..f180715 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/pom.xml
+++ b/ambari-metrics/ambari-metrics-timelineservice/pom.xml
@@ -34,9 +34,9 @@
<!-- Needed for generating FindBugs warnings using parent pom -->
<!--<yarn.basedir>${project.parent.parent.basedir}</yarn.basedir>-->
<protobuf.version>2.5.0</protobuf.version>
- <hadoop.version>2.7.1.2.3.4.0-3347</hadoop.version>
- <phoenix.version>4.4.0.2.3.4.0-3347</phoenix.version>
- <hbase.version>1.1.2.2.3.4.0-3347</hbase.version>
+ <hadoop.version>[2.7.1.2.5.0.0,2.7.1.2.5.0.0-9999)</hadoop.version>
+ <phoenix.version>[4.4.0.2.5.0.0,4.4.0.2.5.0.0-9999)</phoenix.version>
+ <hbase.version>[1.1.2.2.5.0.0,1.1.2.2.5.0.0-9999)</hbase.version>
</properties>
<build>
@@ -249,6 +249,25 @@
</build>
<dependencies>
+
+ <dependency>
+ <groupId>org.apache.helix</groupId>
+ <artifactId>helix-core</artifactId>
+ <version>0.6.5</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>zookeeper</artifactId>
+ <groupId>org.apache.zookeeper</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <artifactId>zookeeper</artifactId>
+ <groupId>org.apache.zookeeper</groupId>
+ <version>3.4.8</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId>
@@ -565,6 +584,10 @@
<groupId>org.jruby</groupId>
<artifactId>jruby-complete</artifactId>
</exclusion>
+ <exclusion>
+ <artifactId>zookeeper</artifactId>
+ <groupId>org.apache.zookeeper</groupId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
index a32e206..a5204e1 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
@@ -46,11 +48,11 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
public class HBaseTimelineMetricStore extends AbstractService implements TimelineMetricStore {
@@ -58,8 +60,10 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
private final TimelineMetricConfiguration configuration;
private PhoenixHBaseAccessor hBaseAccessor;
private static volatile boolean isInitialized = false;
- private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+ private final ScheduledExecutorService watchdogExecutorService = Executors.newSingleThreadScheduledExecutor();
+ private final Map<AGGREGATOR_NAME, ScheduledExecutorService> scheduledExecutors = new HashMap<>();
private TimelineMetricMetadataManager metricMetadataManager;
+ private TimelineMetricHAController haController;
/**
* Construct the service.
@@ -87,6 +91,18 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
metricMetadataManager.initializeMetadata();
// Initialize policies before TTL update
hBaseAccessor.initPoliciesAndTTL();
+ // Start HA service
+ if (configuration.isDistributedOperationModeEnabled()) {
+ // Start the controller
+ haController = new TimelineMetricHAController(configuration);
+ try {
+ haController.initializeHAController();
+ } catch (Exception e) {
+ LOG.error(e);
+ throw new MetricsSystemInitializationException("Unable to " +
+ "initialize HA controller", e);
+ }
+ }
if (Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) {
LOG.info("Using group by aggregators for aggregating host and cluster metrics.");
@@ -94,46 +110,53 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
// Start the cluster aggregator second
TimelineMetricAggregator secondClusterAggregator =
- TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hBaseAccessor, metricsConf, metricMetadataManager);
+ TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(
+ hBaseAccessor, metricsConf, metricMetadataManager, haController);
scheduleAggregatorThread(secondClusterAggregator);
// Start the minute cluster aggregator
TimelineMetricAggregator minuteClusterAggregator =
- TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hBaseAccessor, metricsConf);
+ TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(
+ hBaseAccessor, metricsConf, haController);
scheduleAggregatorThread(minuteClusterAggregator);
// Start the hourly cluster aggregator
TimelineMetricAggregator hourlyClusterAggregator =
- TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hBaseAccessor, metricsConf);
+ TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(
+ hBaseAccessor, metricsConf, haController);
scheduleAggregatorThread(hourlyClusterAggregator);
// Start the daily cluster aggregator
TimelineMetricAggregator dailyClusterAggregator =
- TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hBaseAccessor, metricsConf);
+ TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(
+ hBaseAccessor, metricsConf, haController);
scheduleAggregatorThread(dailyClusterAggregator);
// Start the minute host aggregator
TimelineMetricAggregator minuteHostAggregator =
- TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hBaseAccessor, metricsConf);
+ TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(
+ hBaseAccessor, metricsConf, haController);
scheduleAggregatorThread(minuteHostAggregator);
// Start the hourly host aggregator
TimelineMetricAggregator hourlyHostAggregator =
- TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hBaseAccessor, metricsConf);
+ TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(
+ hBaseAccessor, metricsConf, haController);
scheduleAggregatorThread(hourlyHostAggregator);
// Start the daily host aggregator
TimelineMetricAggregator dailyHostAggregator =
- TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(hBaseAccessor, metricsConf);
+ TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(
+ hBaseAccessor, metricsConf, haController);
scheduleAggregatorThread(dailyHostAggregator);
if (!configuration.isTimelineMetricsServiceWatcherDisabled()) {
int initDelay = configuration.getTimelineMetricsServiceWatcherInitDelay();
int delay = configuration.getTimelineMetricsServiceWatcherDelay();
// Start the watchdog
- executorService.scheduleWithFixedDelay(
- new TimelineMetricStoreWatcher(this, configuration), initDelay, delay,
- TimeUnit.SECONDS);
+ watchdogExecutorService.scheduleWithFixedDelay(
+ new TimelineMetricStoreWatcher(this, configuration),
+ initDelay, delay, TimeUnit.SECONDS);
LOG.info("Started watchdog for timeline metrics store with initial " +
"delay = " + initDelay + ", delay = " + delay);
}
@@ -333,13 +356,30 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
return metricMetadataManager.getHostedAppsCache();
}
- private void scheduleAggregatorThread(TimelineMetricAggregator aggregator) {
- ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+ @Override
+ public List<String> getLiveInstances() {
+ return haController.getLiveInstanceHostNames();
+ }
+
+ private void scheduleAggregatorThread(final TimelineMetricAggregator aggregator) {
if (!aggregator.isDisabled()) {
+ ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, ACTUAL_AGGREGATOR_NAMES.get(aggregator.getName()));
+ }
+ }
+ );
+ scheduledExecutors.put(aggregator.getName(), executorService);
executorService.scheduleAtFixedRate(aggregator,
0l,
aggregator.getSleepIntervalMillis(),
TimeUnit.MILLISECONDS);
+ LOG.info("Scheduled aggregator thread " + aggregator.getName() + " every " +
+ + aggregator.getSleepIntervalMillis() + " milliseconds.");
+ } else {
+ LOG.info("Skipped scheduling " + aggregator.getName() + " since it is disabled.");
}
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index e57f02d..90c1d78 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -23,9 +23,11 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
+import java.net.UnknownHostException;
/**
* Configuration class that reads properties from ams-site.xml. All values
@@ -38,6 +40,7 @@ public class TimelineMetricConfiguration {
public static final String HBASE_SITE_CONFIGURATION_FILE = "hbase-site.xml";
public static final String METRICS_SITE_CONFIGURATION_FILE = "ams-site.xml";
+ public static final String METRICS_ENV_CONFIGURATION_FILE = "ams-env.xml";
public static final String TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR =
"timeline.metrics.aggregator.checkpoint.dir";
@@ -221,8 +224,11 @@ public class TimelineMetricConfiguration {
public static final String HOST_APP_ID = "HOST";
+ public static final String DEFAULT_INSTANCE_PORT = "12001";
+
private Configuration hbaseConf;
private Configuration metricsConf;
+ private Configuration amsEnvConf;
private volatile boolean isInitialized = false;
public void initialize() throws URISyntaxException, MalformedURLException {
@@ -249,6 +255,7 @@ public class TimelineMetricConfiguration {
hbaseConf.addResource(hbaseResUrl.toURI().toURL());
metricsConf = new Configuration(true);
metricsConf.addResource(amsResUrl.toURI().toURL());
+
isInitialized = true;
}
@@ -266,6 +273,37 @@ public class TimelineMetricConfiguration {
return metricsConf;
}
+ public String getZKClientPort() throws MalformedURLException, URISyntaxException {
+ if (!isInitialized) {
+ initialize();
+ }
+ return hbaseConf.getTrimmed("hbase.zookeeper.property.clientPort", "2181");
+ }
+
+ public String getZKQuorum() throws MalformedURLException, URISyntaxException {
+ if (!isInitialized) {
+ initialize();
+ }
+ return hbaseConf.getTrimmed("hbase.zookeeper.quorum");
+ }
+
+ public String getInstanceHostnameFromEnv() throws UnknownHostException {
+ String amsInstanceName = System.getProperty("AMS_INSTANCE_NAME");
+ if (amsInstanceName == null) {
+ amsInstanceName = InetAddress.getLocalHost().getHostName();
+ }
+ return amsInstanceName;
+ }
+
+ public String getInstancePort() throws MalformedURLException, URISyntaxException {
+ String amsInstancePort = System.getProperty("AMS_INSTANCE_PORT");
+ if (amsInstancePort == null) {
+ // Check config
+ return getMetricsConf().get("timeline.metrics.availability.instance.port", DEFAULT_INSTANCE_PORT);
+ }
+ return DEFAULT_INSTANCE_PORT;
+ }
+
public String getWebappAddress() {
String defaultHttpAddress = "0.0.0.0:6188";
if (metricsConf != null) {
@@ -323,4 +361,12 @@ public class TimelineMetricConfiguration {
}
return defaultRpcAddress;
}
+
+ public boolean isDistributedOperationModeEnabled() {
+ try {
+ return getMetricsConf().get("timeline.metrics.service.operation.mode").equals("distributed");
+ } catch (Exception e) {
+ return false;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
index 0aa102e..2f08f3f 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
@@ -87,4 +87,10 @@ public interface TimelineMetricStore {
* @throws IOException
*/
Map<String, Set<String>> getHostAppsMetadata() throws SQLException, IOException;
+
+ /**
+ * Return a list of known live collector nodes
+ * @return [ hostname ]
+ */
+ List<String> getLiveInstances();
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
index ba7807b..ae87cf1 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
@@ -20,6 +20,9 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
import org.slf4j.LoggerFactory;
@@ -34,6 +37,7 @@ import java.util.Date;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.RESULTSET_FETCH_SIZE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
/**
* Base class for all runnable aggregators. Provides common functions like
@@ -52,11 +56,12 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
protected String tableName;
protected String outputTableName;
protected Long nativeTimeRangeDelay;
+ protected AggregationTaskRunner taskRunner;
// Explicitly name aggregators for logging needs
- private final String aggregatorName;
+ private final AGGREGATOR_NAME aggregatorName;
- AbstractTimelineAggregator(String aggregatorName,
+ AbstractTimelineAggregator(AGGREGATOR_NAME aggregatorName,
PhoenixHBaseAccessor hBaseAccessor,
Configuration metricsConf) {
this.aggregatorName = aggregatorName;
@@ -64,10 +69,10 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
this.metricsConf = metricsConf;
this.checkpointDelayMillis = SECONDS.toMillis(metricsConf.getInt(AGGREGATOR_CHECKPOINT_DELAY, 120));
this.resultsetFetchSize = metricsConf.getInt(RESULTSET_FETCH_SIZE, 2000);
- this.LOG = LoggerFactory.getLogger(aggregatorName);
+ this.LOG = LoggerFactory.getLogger(ACTUAL_AGGREGATOR_NAMES.get(aggregatorName));
}
- public AbstractTimelineAggregator(String aggregatorName,
+ public AbstractTimelineAggregator(AGGREGATOR_NAME aggregatorName,
PhoenixHBaseAccessor hBaseAccessor,
Configuration metricsConf,
String checkpointLocation,
@@ -76,7 +81,8 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
String aggregatorDisableParam,
String tableName,
String outputTableName,
- Long nativeTimeRangeDelay) {
+ Long nativeTimeRangeDelay,
+ TimelineMetricHAController haController) {
this(aggregatorName, hBaseAccessor, metricsConf);
this.checkpointLocation = checkpointLocation;
this.sleepIntervalMillis = sleepIntervalMillis;
@@ -84,7 +90,9 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
this.aggregatorDisableParam = aggregatorDisableParam;
this.tableName = tableName;
this.outputTableName = outputTableName;
- this.nativeTimeRangeDelay = nativeTimeRangeDelay;
+ this.nativeTimeRangeDelay = nativeTimeRangeDelay;
+ this.taskRunner = haController != null && haController.isInitialized() ?
+ haController.getAggregationTaskRunner() : null;
}
@Override
@@ -98,25 +106,39 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
* Access relaxed for tests
*/
public void runOnce(Long SLEEP_INTERVAL) {
+ boolean performAggregationFunction = true;
+ if (taskRunner != null) {
+ switch (getAggregatorType()) {
+ case HOST:
+ performAggregationFunction = taskRunner.performsHostAggregation();
+ break;
+ case CLUSTER:
+ performAggregationFunction = taskRunner.performsClusterAggregation();
+ }
+ }
- long currentTime = System.currentTimeMillis();
- long lastCheckPointTime = readLastCheckpointSavingOnFirstRun(currentTime);
-
- if (lastCheckPointTime != -1) {
- LOG.info("Last check point time: " + lastCheckPointTime + ", lagBy: "
- + ((currentTime - lastCheckPointTime) / 1000)
- + " seconds.");
-
- boolean success = doWork(lastCheckPointTime, lastCheckPointTime + SLEEP_INTERVAL);
+ if (performAggregationFunction) {
+ long currentTime = System.currentTimeMillis();
+ long lastCheckPointTime = readLastCheckpointSavingOnFirstRun(currentTime);
- if (success) {
- try {
- saveCheckPoint(lastCheckPointTime + SLEEP_INTERVAL);
- } catch (IOException io) {
- LOG.warn("Error saving checkpoint, restarting aggregation at " +
- "previous checkpoint.");
+ if (lastCheckPointTime != -1) {
+ LOG.info("Last check point time: " + lastCheckPointTime + ", lagBy: "
+ + ((currentTime - lastCheckPointTime) / 1000)
+ + " seconds.");
+
+ boolean success = doWork(lastCheckPointTime, lastCheckPointTime + SLEEP_INTERVAL);
+
+ if (success) {
+ try {
+ saveCheckPoint(lastCheckPointTime + SLEEP_INTERVAL);
+ } catch (IOException io) {
+ LOG.warn("Error saving checkpoint, restarting aggregation at " +
+ "previous checkpoint.");
+ }
}
}
+ } else {
+ LOG.info("Skipping aggregation function not owned by this instance.");
}
}
@@ -174,6 +196,9 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
}
protected long readCheckPoint() {
+ if (taskRunner != null) {
+ return taskRunner.getCheckpointManager().readCheckpoint(aggregatorName);
+ }
try {
File checkpoint = new File(getCheckpointLocation());
if (checkpoint.exists()) {
@@ -189,15 +214,23 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
}
protected void saveCheckPoint(long checkpointTime) throws IOException {
- File checkpoint = new File(getCheckpointLocation());
- if (!checkpoint.exists()) {
- boolean done = checkpoint.createNewFile();
- if (!done) {
- throw new IOException("Could not create checkpoint at location, " +
- getCheckpointLocation());
+ if (taskRunner != null) {
+ boolean success = taskRunner.getCheckpointManager().writeCheckpoint(aggregatorName, checkpointTime);
+ if (!success) {
+ LOG.error("Error saving checkpoint with AggregationTaskRunner, " +
+ "aggregator = " + aggregatorName + "value = " + checkpointTime);
}
+ } else {
+ File checkpoint = new File(getCheckpointLocation());
+ if (!checkpoint.exists()) {
+ boolean done = checkpoint.createNewFile();
+ if (!done) {
+ throw new IOException("Could not create checkpoint at location, " +
+ getCheckpointLocation());
+ }
+ }
+ FileUtils.writeStringToFile(checkpoint, String.valueOf(checkpointTime));
}
- FileUtils.writeStringToFile(checkpoint, String.valueOf(checkpointTime));
}
/**
@@ -317,4 +350,21 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
return currentTime - (currentTime % aggregatorPeriod);
}
+ /**
+ * Get @AGGREGATOR_TYPE based on the output table.
+ * This is solely used by the HAController to determine which lock to acquire.
+ */
+ public AGGREGATOR_TYPE getAggregatorType() {
+ if (outputTableName.contains("RECORD")) {
+ return AGGREGATOR_TYPE.HOST;
+ } else if (outputTableName.contains("AGGREGATE")) {
+ return AGGREGATOR_TYPE.CLUSTER;
+ }
+ return null;
+ }
+
+ @Override
+ public AGGREGATOR_NAME getName() {
+ return aggregatorName;
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
index 295db0e..150e3f1 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
@@ -1,5 +1,7 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -20,22 +22,38 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
public interface TimelineMetricAggregator extends Runnable {
/**
* Aggregate metric data within the time bounds.
+ *
* @param startTime start time millis
- * @param endTime end time millis
+ * @param endTime end time millis
* @return success
*/
- public boolean doWork(long startTime, long endTime);
+ boolean doWork(long startTime, long endTime);
/**
* Is aggregator is disabled by configuration.
+ *
* @return true/false
*/
- public boolean isDisabled();
+ boolean isDisabled();
/**
* Return aggregator Interval
+ *
* @return Interval in Millis
*/
- public Long getSleepIntervalMillis();
+ Long getSleepIntervalMillis();
+
+ /**
+ * Get aggregator name
+ * @return @AGGREGATOR_NAME
+ */
+ AGGREGATOR_NAME getName();
+ /**
+ * Known aggregator types
+ */
+ enum AGGREGATOR_TYPE {
+ CLUSTER,
+ HOST
}
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
index cc85c56..4c44f9e 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
import org.apache.commons.io.FilenameUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -29,12 +30,12 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_DISABLED;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_DISABLED;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_DISABLED;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_DISABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER;
@@ -48,6 +49,13 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_DAILY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_HOURLY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_MINUTE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_DAILY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_HOURLY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_MINUTE;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
@@ -86,7 +94,8 @@ public class TimelineMetricAggregatorFactory {
* Interval : 5 mins
*/
public static TimelineMetricAggregator createTimelineMetricAggregatorMinute
- (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+ (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+ TimelineMetricHAController haController) {
String checkpointDir = metricsConf.get(
TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -104,7 +113,7 @@ public class TimelineMetricAggregatorFactory {
if (useGroupByAggregator(metricsConf)) {
return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricHostAggregator(
- "TimelineMetricHostAggregatorMinute",
+ METRIC_RECORD_MINUTE,
hBaseAccessor, metricsConf,
checkpointLocation,
sleepIntervalMillis,
@@ -112,12 +121,13 @@ public class TimelineMetricAggregatorFactory {
hostAggregatorDisabledParam,
inputTableName,
outputTableName,
- 120000l
+ 120000l,
+ haController
);
}
return new TimelineMetricHostAggregator(
- "TimelineMetricHostAggregatorMinute",
+ METRIC_RECORD_MINUTE,
hBaseAccessor, metricsConf,
checkpointLocation,
sleepIntervalMillis,
@@ -125,7 +135,8 @@ public class TimelineMetricAggregatorFactory {
hostAggregatorDisabledParam,
inputTableName,
outputTableName,
- 120000l);
+ 120000l,
+ haController);
}
/**
@@ -133,7 +144,8 @@ public class TimelineMetricAggregatorFactory {
* Interval : 1 hour
*/
public static TimelineMetricAggregator createTimelineMetricAggregatorHourly
- (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+ (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+ TimelineMetricHAController haController) {
String checkpointDir = metricsConf.get(
TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -151,7 +163,7 @@ public class TimelineMetricAggregatorFactory {
if (useGroupByAggregator(metricsConf)) {
return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricHostAggregator(
- "TimelineMetricHostAggregatorHourly",
+ METRIC_RECORD_HOURLY,
hBaseAccessor, metricsConf,
checkpointLocation,
sleepIntervalMillis,
@@ -159,12 +171,13 @@ public class TimelineMetricAggregatorFactory {
hostAggregatorDisabledParam,
inputTableName,
outputTableName,
- 3600000l
+ 3600000l,
+ haController
);
}
return new TimelineMetricHostAggregator(
- "TimelineMetricHostAggregatorHourly",
+ METRIC_RECORD_HOURLY,
hBaseAccessor, metricsConf,
checkpointLocation,
sleepIntervalMillis,
@@ -172,7 +185,8 @@ public class TimelineMetricAggregatorFactory {
hostAggregatorDisabledParam,
inputTableName,
outputTableName,
- 3600000l);
+ 3600000l,
+ haController);
}
/**
@@ -180,7 +194,8 @@ public class TimelineMetricAggregatorFactory {
* Interval : 1 day
*/
public static TimelineMetricAggregator createTimelineMetricAggregatorDaily
- (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+ (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+ TimelineMetricHAController haController) {
String checkpointDir = metricsConf.get(
TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -198,7 +213,7 @@ public class TimelineMetricAggregatorFactory {
if (useGroupByAggregator(metricsConf)) {
return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricHostAggregator(
- "TimelineMetricHostAggregatorDaily",
+ METRIC_RECORD_DAILY,
hBaseAccessor, metricsConf,
checkpointLocation,
sleepIntervalMillis,
@@ -206,12 +221,13 @@ public class TimelineMetricAggregatorFactory {
hostAggregatorDisabledParam,
inputTableName,
outputTableName,
- 3600000l
+ 3600000l,
+ haController
);
}
return new TimelineMetricHostAggregator(
- "TimelineMetricHostAggregatorDaily",
+ METRIC_RECORD_DAILY,
hBaseAccessor, metricsConf,
checkpointLocation,
sleepIntervalMillis,
@@ -219,7 +235,8 @@ public class TimelineMetricAggregatorFactory {
hostAggregatorDisabledParam,
inputTableName,
outputTableName,
- 3600000l);
+ 3600000l,
+ haController);
}
/**
@@ -229,7 +246,8 @@ public class TimelineMetricAggregatorFactory {
*/
public static TimelineMetricAggregator createTimelineClusterAggregatorSecond(
PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
- TimelineMetricMetadataManager metadataManager) {
+ TimelineMetricMetadataManager metadataManager,
+ TimelineMetricHAController haController) {
String checkpointDir = metricsConf.get(
TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -252,7 +270,7 @@ public class TimelineMetricAggregatorFactory {
// Second based aggregation have added responsibility of time slicing
return new TimelineMetricClusterAggregatorSecond(
- "TimelineClusterAggregatorSecond",
+ METRIC_AGGREGATE_SECOND,
metadataManager,
hBaseAccessor, metricsConf,
checkpointLocation,
@@ -262,7 +280,8 @@ public class TimelineMetricAggregatorFactory {
inputTableName,
outputTableName,
120000l,
- timeSliceIntervalMillis
+ timeSliceIntervalMillis,
+ haController
);
}
@@ -271,7 +290,8 @@ public class TimelineMetricAggregatorFactory {
* Interval : 5 mins
*/
public static TimelineMetricAggregator createTimelineClusterAggregatorMinute(
- PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+ PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+ TimelineMetricHAController haController) {
String checkpointDir = metricsConf.get(
TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -291,7 +311,7 @@ public class TimelineMetricAggregatorFactory {
if (useGroupByAggregator(metricsConf)) {
return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator(
- "TimelineClusterAggregatorMinute",
+ METRIC_AGGREGATE_MINUTE,
hBaseAccessor, metricsConf,
checkpointLocation,
sleepIntervalMillis,
@@ -299,12 +319,13 @@ public class TimelineMetricAggregatorFactory {
aggregatorDisabledParam,
inputTableName,
outputTableName,
- 120000l
+ 120000l,
+ haController
);
}
return new TimelineMetricClusterAggregator(
- "TimelineClusterAggregatorMinute",
+ METRIC_AGGREGATE_MINUTE,
hBaseAccessor, metricsConf,
checkpointLocation,
sleepIntervalMillis,
@@ -312,7 +333,8 @@ public class TimelineMetricAggregatorFactory {
aggregatorDisabledParam,
inputTableName,
outputTableName,
- 120000l
+ 120000l,
+ haController
);
}
@@ -321,7 +343,8 @@ public class TimelineMetricAggregatorFactory {
* Interval : 1 hour
*/
public static TimelineMetricAggregator createTimelineClusterAggregatorHourly(
- PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+ PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+ TimelineMetricHAController haController) {
String checkpointDir = metricsConf.get(
TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -341,7 +364,7 @@ public class TimelineMetricAggregatorFactory {
if (useGroupByAggregator(metricsConf)) {
return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator(
- "TimelineClusterAggregatorHourly",
+ METRIC_AGGREGATE_HOURLY,
hBaseAccessor, metricsConf,
checkpointLocation,
sleepIntervalMillis,
@@ -349,12 +372,13 @@ public class TimelineMetricAggregatorFactory {
aggregatorDisabledParam,
inputTableName,
outputTableName,
- 120000l
+ 120000l,
+ haController
);
}
return new TimelineMetricClusterAggregator(
- "TimelineClusterAggregatorHourly",
+ METRIC_AGGREGATE_HOURLY,
hBaseAccessor, metricsConf,
checkpointLocation,
sleepIntervalMillis,
@@ -362,7 +386,8 @@ public class TimelineMetricAggregatorFactory {
aggregatorDisabledParam,
inputTableName,
outputTableName,
- 120000l
+ 120000l,
+ haController
);
}
@@ -371,7 +396,8 @@ public class TimelineMetricAggregatorFactory {
* Interval : 1 day
*/
public static TimelineMetricAggregator createTimelineClusterAggregatorDaily(
- PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+ PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
+ TimelineMetricHAController haController) {
String checkpointDir = metricsConf.get(
TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -391,7 +417,7 @@ public class TimelineMetricAggregatorFactory {
if (useGroupByAggregator(metricsConf)) {
return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator(
- "TimelineClusterAggregatorDaily",
+ METRIC_AGGREGATE_DAILY,
hBaseAccessor, metricsConf,
checkpointLocation,
sleepIntervalMillis,
@@ -399,12 +425,13 @@ public class TimelineMetricAggregatorFactory {
aggregatorDisabledParam,
inputTableName,
outputTableName,
- 120000l
+ 120000l,
+ haController
);
}
return new TimelineMetricClusterAggregator(
- "TimelineClusterAggregatorDaily",
+ METRIC_AGGREGATE_DAILY,
hBaseAccessor, metricsConf,
checkpointLocation,
sleepIntervalMillis,
@@ -412,7 +439,8 @@ public class TimelineMetricAggregatorFactory {
aggregatorDisabledParam,
inputTableName,
outputTableName,
- 120000l
+ 120000l,
+ haController
);
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
index f90b01f..6438256 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
@@ -36,7 +38,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
private final TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(true);
private final boolean isClusterPrecisionInputTable;
- public TimelineMetricClusterAggregator(String aggregatorName,
+ public TimelineMetricClusterAggregator(AGGREGATOR_NAME aggregatorName,
PhoenixHBaseAccessor hBaseAccessor,
Configuration metricsConf,
String checkpointLocation,
@@ -45,11 +47,12 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
String hostAggregatorDisabledParam,
String inputTableName,
String outputTableName,
- Long nativeTimeRangeDelay) {
+ Long nativeTimeRangeDelay,
+ TimelineMetricHAController haController) {
super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
sleepIntervalMillis, checkpointCutOffMultiplier,
hostAggregatorDisabledParam, inputTableName, outputTableName,
- nativeTimeRangeDelay);
+ nativeTimeRangeDelay, haController);
isClusterPrecisionInputTable = inputTableName.equals(METRICS_CLUSTER_AGGREGATE_TABLE_NAME);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
index e0e065b..9c11d39 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
@@ -22,10 +22,12 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.sink.timeline.PostProcessingUtil;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
+
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -39,7 +41,6 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
/**
* Aggregates a metric across all hosts in the cluster. Reads metrics from
@@ -54,8 +55,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
private final Long serverTimeShiftAdjustment;
private final boolean interpolationEnabled;
-
- public TimelineMetricClusterAggregatorSecond(String aggregatorName,
+ public TimelineMetricClusterAggregatorSecond(AGGREGATOR_NAME aggregatorName,
TimelineMetricMetadataManager metadataManager,
PhoenixHBaseAccessor hBaseAccessor,
Configuration metricsConf,
@@ -66,10 +66,11 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
String tableName,
String outputTableName,
Long nativeTimeRangeDelay,
- Long timeSliceInterval) {
+ Long timeSliceInterval,
+ TimelineMetricHAController haController) {
super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
sleepIntervalMillis, checkpointCutOffMultiplier, aggregatorDisabledParam,
- tableName, outputTableName, nativeTimeRangeDelay);
+ tableName, outputTableName, nativeTimeRangeDelay, haController);
appAggregator = new TimelineMetricAppAggregator(metadataManager, metricsConf);
this.timeSliceIntervalMillis = timeSliceInterval;
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
index 26e73b0..364a4b5 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
@@ -22,20 +22,24 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
+
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
+
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
private static final Log LOG = LogFactory.getLog(TimelineMetricHostAggregator.class);
TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
- public TimelineMetricHostAggregator(String aggregatorName,
+ public TimelineMetricHostAggregator(AGGREGATOR_NAME aggregatorName,
PhoenixHBaseAccessor hBaseAccessor,
Configuration metricsConf,
String checkpointLocation,
@@ -44,10 +48,11 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
String hostAggregatorDisabledParam,
String tableName,
String outputTableName,
- Long nativeTimeRangeDelay) {
+ Long nativeTimeRangeDelay,
+ TimelineMetricHAController haController) {
super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
sleepIntervalMillis, checkpointCutOffMultiplier, hostAggregatorDisabledParam,
- tableName, outputTableName, nativeTimeRangeDelay);
+ tableName, outputTableName, nativeTimeRangeDelay, haController);
}
@Override
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
index c056d79..aeddf06 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
@@ -20,19 +20,23 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.EmptyCondition;
+
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Date;
+
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_AGGREGATED_APP_METRIC_GROUPBY_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator {
private final String aggregateColumnName;
- public TimelineMetricClusterAggregator(String aggregatorName,
+ public TimelineMetricClusterAggregator(AGGREGATOR_NAME aggregatorName,
PhoenixHBaseAccessor hBaseAccessor,
Configuration metricsConf,
String checkpointLocation,
@@ -41,11 +45,12 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
String hostAggregatorDisabledParam,
String inputTableName,
String outputTableName,
- Long nativeTimeRangeDelay) {
+ Long nativeTimeRangeDelay,
+ TimelineMetricHAController haController) {
super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
sleepIntervalMillis, checkpointCutOffMultiplier,
hostAggregatorDisabledParam, inputTableName, outputTableName,
- nativeTimeRangeDelay);
+ nativeTimeRangeDelay, haController);
if (inputTableName.equals(METRICS_CLUSTER_AGGREGATE_TABLE_NAME)) {
aggregateColumnName = "HOSTS_COUNT";
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
index 118c695..0df8329 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
@@ -20,8 +20,11 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.EmptyCondition;
+
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -31,7 +34,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
- public TimelineMetricHostAggregator(String aggregatorName,
+ public TimelineMetricHostAggregator(AGGREGATOR_NAME aggregatorName,
PhoenixHBaseAccessor hBaseAccessor,
Configuration metricsConf,
String checkpointLocation,
@@ -40,10 +43,11 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
String hostAggregatorDisabledParam,
String tableName,
String outputTableName,
- Long nativeTimeRangeDelay) {
+ Long nativeTimeRangeDelay,
+ TimelineMetricHAController haController) {
super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
sleepIntervalMillis, checkpointCutOffMultiplier, hostAggregatorDisabledParam,
- tableName, outputTableName, nativeTimeRangeDelay);
+ tableName, outputTableName, nativeTimeRangeDelay, haController);
}
@Override
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java
new file mode 100644
index 0000000..4a1f17b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator.AGGREGATOR_TYPE;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.participant.StateMachineEngine;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator.AGGREGATOR_TYPE.CLUSTER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator.AGGREGATOR_TYPE.HOST;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_DAILY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_HOURLY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_MINUTE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_DAILY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_HOURLY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_MINUTE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.CLUSTER_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.METRIC_AGGREGATORS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.STATE_MODEL_NAME;
+
+public class AggregationTaskRunner {
+ private final String instanceName;
+ private final String zkAddress;
+ private HelixManager manager;
+ private static final Log LOG = LogFactory.getLog(AggregationTaskRunner.class);
+ private CheckpointManager checkpointManager;
+ // Map partition name to an aggregator dimension
+ static final Map<String, AGGREGATOR_TYPE> PARTITION_AGGREGATION_TYPES = new HashMap<>();
+ // Ownership flags to be set by the State transitions
+ private final AtomicBoolean performsClusterAggregation = new AtomicBoolean(false);
+ private final AtomicBoolean performsHostAggregation = new AtomicBoolean(false);
+
+ public enum AGGREGATOR_NAME {
+ METRIC_RECORD_MINUTE,
+ METRIC_RECORD_HOURLY,
+ METRIC_RECORD_DAILY,
+ METRIC_AGGREGATE_SECOND,
+ METRIC_AGGREGATE_MINUTE,
+ METRIC_AGGREGATE_HOURLY,
+ METRIC_AGGREGATE_DAILY,
+ }
+
+ public static final Map<AGGREGATOR_NAME, String> ACTUAL_AGGREGATOR_NAMES = new HashMap<>();
+
+ static {
+ ACTUAL_AGGREGATOR_NAMES.put(METRIC_RECORD_MINUTE, "TimelineMetricHostAggregatorMinute");
+ ACTUAL_AGGREGATOR_NAMES.put(METRIC_RECORD_HOURLY, "TimelineMetricHostAggregatorHourly");
+ ACTUAL_AGGREGATOR_NAMES.put(METRIC_RECORD_DAILY, "TimelineMetricHostAggregatorDaily");
+ ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_SECOND, "TimelineClusterAggregatorSecond");
+ ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_MINUTE, "TimelineClusterAggregatorMinute");
+ ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_HOURLY, "TimelineClusterAggregatorHourly");
+ ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_DAILY, "TimelineClusterAggregatorDaily");
+
+ // Partition name to task assignment
+ PARTITION_AGGREGATION_TYPES.put(METRIC_AGGREGATORS + "_0", CLUSTER);
+ PARTITION_AGGREGATION_TYPES.put(METRIC_AGGREGATORS + "_1", HOST);
+ }
+
+ public AggregationTaskRunner(String instanceName, String zkAddress) {
+ this.instanceName = instanceName;
+ this.zkAddress = zkAddress;
+ }
+
+ public void initialize() throws Exception {
+ manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, instanceName,
+ InstanceType.PARTICIPANT, zkAddress);
+
+ OnlineOfflineStateModelFactory stateModelFactory =
+ new OnlineOfflineStateModelFactory(instanceName, this);
+
+ StateMachineEngine stateMach = manager.getStateMachineEngine();
+ stateMach.registerStateModelFactory(STATE_MODEL_NAME, stateModelFactory);
+ manager.connect();
+
+ checkpointManager = new CheckpointManager(manager.getHelixPropertyStore());
+ }
+
+ public boolean performsClusterAggregation() {
+ return performsClusterAggregation.get();
+ }
+
+ public boolean performsHostAggregation() {
+ return performsHostAggregation.get();
+ }
+
+ public CheckpointManager getCheckpointManager() {
+ return checkpointManager;
+ }
+
+ public void setPartitionAggregationFunction(AGGREGATOR_TYPE type) {
+ switch (type) {
+ case HOST:
+ performsHostAggregation.set(true);
+ LOG.info("Set host aggregator function for : " + instanceName);
+ break;
+ case CLUSTER:
+ performsClusterAggregation.set(true);
+ LOG.info("Set cluster aggregator function for : " + instanceName);
+ }
+ }
+
+ public void unsetPartitionAggregationFunction(AGGREGATOR_TYPE type) {
+ switch (type) {
+ case HOST:
+ performsHostAggregation.set(false);
+ LOG.info("Unset host aggregator function for : " + instanceName);
+ break;
+ case CLUSTER:
+ performsClusterAggregation.set(false);
+ LOG.info("Unset cluster aggregator function for : " + instanceName);
+ }
+ }
+
+ /**
+ * Disconnect participant before controller shutdown
+ */
+ void stop() {
+ manager.disconnect();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/CheckpointManager.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/CheckpointManager.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/CheckpointManager.java
new file mode 100644
index 0000000..439102f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/CheckpointManager.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+import org.apache.helix.AccessOption;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.zookeeper.data.Stat;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
+
+public class CheckpointManager {
+ private final ZkHelixPropertyStore<ZNRecord> propertyStore;
+ private static final Log LOG = LogFactory.getLog(CheckpointManager.class);
+
+ static final String ZNODE_FIELD = "checkpoint";
+ static final String CHECKPOINT_PATH_PREFIX = "CHECKPOINTS";
+
+ public CheckpointManager(ZkHelixPropertyStore<ZNRecord> propertyStore) {
+ this.propertyStore = propertyStore;
+ }
+
+ /**
+ * Read aggregator checkpoint from zookeeper
+ *
+ * @return timestamp
+ */
+ public long readCheckpoint(AGGREGATOR_NAME aggregatorName) {
+ String path = getCheckpointZKPath(aggregatorName);
+ LOG.debug("Reading checkpoint at " + path);
+ Stat stat = new Stat();
+ ZNRecord znRecord = propertyStore.get(path, stat, AccessOption.PERSISTENT);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Stat => " + stat);
+ }
+ long checkpoint = znRecord != null ? znRecord.getLongField(ZNODE_FIELD, -1) : -1;
+ LOG.debug("Checkpoint value = " + checkpoint);
+ return checkpoint;
+ }
+
+ /**
+ * Write aggregator checkpoint in zookeeper
+ *
+ * @param value timestamp
+ * @return sucsess
+ */
+ public boolean writeCheckpoint(AGGREGATOR_NAME aggregatorName, long value) {
+ String path = getCheckpointZKPath(aggregatorName);
+ LOG.debug(String.format("Saving checkpoint at %s with value %s", path, value));
+ return propertyStore.update(path, new CheckpointDataUpdater(path, value), AccessOption.PERSISTENT);
+ }
+
+ static class CheckpointDataUpdater implements DataUpdater<ZNRecord> {
+ final String path;
+ final Long value;
+
+ public CheckpointDataUpdater(String path, Long value) {
+ this.path = path;
+ this.value = value;
+ }
+
+ @Override
+ public ZNRecord update(ZNRecord currentData) {
+ if (currentData == null) {
+ currentData = new ZNRecord(path);
+ }
+ currentData.setLongField(ZNODE_FIELD, value);
+ return currentData;
+ }
+ }
+
+ String getCheckpointZKPath(AGGREGATOR_NAME aggregatorName) {
+ StringBuilder sb = new StringBuilder("/");
+ sb.append(CHECKPOINT_PATH_PREFIX);
+ sb.append("/");
+ sb.append(ACTUAL_AGGREGATOR_NAMES.get(aggregatorName));
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa4454e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/OnlineOfflineStateModelFactory.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/OnlineOfflineStateModelFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/OnlineOfflineStateModelFactory.java
new file mode 100644
index 0000000..7d3350b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/OnlineOfflineStateModelFactory.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator.AGGREGATOR_TYPE;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.PARTITION_AGGREGATION_TYPES;
+
+public class OnlineOfflineStateModelFactory extends StateModelFactory<StateModel> {
+ private static final Log LOG = LogFactory.getLog(OnlineOfflineStateModelFactory.class);
+ private final String instanceName;
+ private final AggregationTaskRunner taskRunner;
+
+ public OnlineOfflineStateModelFactory(String instanceName, AggregationTaskRunner taskRunner) {
+ this.instanceName = instanceName;
+ this.taskRunner = taskRunner;
+ }
+
+ @Override
+ public StateModel createNewStateModel(String resourceName, String partition) {
+ LOG.info("Received request to process partition = " + partition + ", for " +
+ "resource = " + resourceName + ", at " + instanceName);
+ return new OnlineOfflineStateModel();
+ }
+
+ public class OnlineOfflineStateModel extends StateModel {
+ public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
+ String partitionName = message.getPartitionName();
+ LOG.info("Received transition to Online from Offline for partition: " + partitionName);
+ AGGREGATOR_TYPE type = PARTITION_AGGREGATION_TYPES.get(partitionName);
+ taskRunner.setPartitionAggregationFunction(type);
+ }
+
+ public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
+ String partitionName = message.getPartitionName();
+ LOG.info("Received transition to Offline from Online for partition: " + partitionName);
+ AGGREGATOR_TYPE type = PARTITION_AGGREGATION_TYPES.get(partitionName);
+ taskRunner.unsetPartitionAggregationFunction(type);
+ }
+
+ public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
+ String partitionName = message.getPartitionName();
+ LOG.info("Received transition to Dropped from Offline for partition: " + partitionName);
+ AGGREGATOR_TYPE type = PARTITION_AGGREGATION_TYPES.get(partitionName);
+ taskRunner.unsetPartitionAggregationFunction(type);
+ }
+ }
+}
\ No newline at end of file