You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by wu...@apache.org on 2022/11/17 09:27:53 UTC
[ambari-metrics] branch master updated: AMBARI-25635: Clear Cluster and METRIC_AGGREGATORS MBeans upon shutdown (#68)
This is an automated email from the ASF dual-hosted git repository.
wuzhiguo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ambari-metrics.git
The following commit(s) were added to refs/heads/master by this push:
new 02af960 AMBARI-25635: Clear Cluster and METRIC_AGGREGATORS MBeans upon shutdown (#68)
02af960 is described below
commit 02af960cbde1bfce3ae4a9e7a78fc249402da8f7
Author: Zhiguo Wu <wu...@apache.org>
AuthorDate: Thu Nov 17 17:27:49 2022 +0800
AMBARI-25635: Clear Cluster and METRIC_AGGREGATORS MBeans upon shutdown (#68)
---
.../availability/MetricCollectorHAController.java | 175 ++++++++++-----------
.../MetricCollectorHAControllerTest.java | 3 +-
2 files changed, 82 insertions(+), 96 deletions(-)
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAController.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAController.java
index adbe8e7..a27d13a 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAController.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAController.java
@@ -31,7 +31,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -42,51 +41,54 @@ import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
+import org.apache.helix.LiveInstanceChangeListener;
import org.apache.helix.NotificationContext;
-import org.apache.helix.controller.GenericHelixController;
+import org.apache.helix.PropertyKey;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.OnlineOfflineSMD;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.tools.StateModelConfigGenerator;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
-;
-
public class MetricCollectorHAController {
private static final Log LOG = LogFactory.getLog(MetricCollectorHAController.class);
+ @VisibleForTesting
static final String CLUSTER_NAME = "ambari-metrics-cluster";
+ @VisibleForTesting
static final String METRIC_AGGREGATORS = "METRIC_AGGREGATORS";
+ @VisibleForTesting
static final String DEFAULT_STATE_MODEL = OnlineOfflineSMD.name;
- static final String INSTANCE_NAME_DELIMITER = "_";
+ private static final String INSTANCE_NAME_DELIMITER = "_";
+ private static final int PARTITION_NUMBER = 2;
+ private static final int REPLICATION_FACTOR = 1;
+ @VisibleForTesting
final String zkConnectUrl;
- final String instanceHostname;
- final InstanceConfig instanceConfig;
- final AggregationTaskRunner aggregationTaskRunner;
- final TimelineMetricConfiguration configuration;
+ private final String instanceHostname;
+ private final InstanceConfig instanceConfig;
+ private final AggregationTaskRunner aggregationTaskRunner;
// Cache list of known live instances
- final List<String> liveInstanceNames = new ArrayList<>();
+ private final List<String> liveInstanceNames = new ArrayList<>(2);
+ private final LiveInstanceTracker liveInstanceTracker = new LiveInstanceTracker();
// Helix Admin
+ @VisibleForTesting
HelixAdmin admin;
// Helix Manager
- HelixManager manager;
+ private HelixManager manager;
private volatile boolean isInitialized = false;
public MetricCollectorHAController(TimelineMetricConfiguration configuration) {
- this.configuration = configuration;
String instancePort;
try {
instanceHostname = configuration.getInstanceHostnameFromEnv();
instancePort = configuration.getInstancePort();
-
} catch (Exception e) {
LOG.error("Error reading configs from classpath, will resort to defaults.", e);
throw new MetricsSystemInitializationException(e.getMessage());
@@ -97,42 +99,32 @@ public class MetricCollectorHAController {
String zkQuorum = configuration.getClusterZKQuorum();
if (StringUtils.isEmpty(zkClientPort) || StringUtils.isEmpty(zkQuorum)) {
- throw new Exception("Unable to parse zookeeper quorum. clientPort = "
- + zkClientPort +", quorum = " + zkQuorum);
+ throw new Exception(String.format("Unable to parse zookeeper quorum. clientPort = %s, quorum = %s", zkClientPort, zkQuorum));
}
zkConnectUrl = configuration.getZkConnectionUrl(zkClientPort, zkQuorum);
-
} catch (Exception e) {
LOG.error("Unable to load hbase-site from classpath.", e);
- throw new MetricsSystemInitializationException(e.getMessage());
+ throw new MetricsSystemInitializationException(e.getMessage(), e);
}
instanceConfig = new InstanceConfig(instanceHostname + INSTANCE_NAME_DELIMITER + instancePort);
instanceConfig.setHostName(instanceHostname);
instanceConfig.setPort(instancePort);
instanceConfig.setInstanceEnabled(true);
- aggregationTaskRunner = new AggregationTaskRunner(
- instanceConfig.getInstanceName(), zkConnectUrl, getClusterName());
- }
-
- /**
- * Name of Helix znode
- */
- public String getClusterName() {
- return CLUSTER_NAME;
+ aggregationTaskRunner = new AggregationTaskRunner(instanceConfig.getInstanceName(), zkConnectUrl, CLUSTER_NAME);
}
/**
* Initialize the instance with zookeeper via Helix
*/
public void initializeHAController() throws Exception {
- String clusterName = getClusterName();
+ // Create setup tool instance
admin = new ZKHelixAdmin(zkConnectUrl);
- // create cluster
- LOG.info("Creating zookeeper cluster node: " + clusterName);
- boolean clusterAdded = admin.addCluster(clusterName, false);
- LOG.info("Was cluster added successfully? " + clusterAdded);
+ // Create cluster namespace in zookeeper. Don't recreate if exists.
+ LOG.info(String.format("Creating zookeeper cluster node: %s", CLUSTER_NAME));
+ boolean clusterAdded = admin.addCluster(CLUSTER_NAME, false);
+ LOG.info(String.format("Was cluster added successfully? %s", clusterAdded));
// Adding host to the cluster
boolean success = false;
@@ -141,16 +133,16 @@ public class MetricCollectorHAController {
for (int i = 0; i < tries && !success; i++) {
try {
- List<String> nodes = admin.getInstancesInCluster(clusterName);
- if (CollectionUtils.isEmpty(nodes) || !nodes.contains(instanceConfig.getInstanceName())) {
- LOG.info("Adding participant instance " + instanceConfig);
- admin.addInstance(clusterName, instanceConfig);
+ List<String> nodes = admin.getInstancesInCluster(CLUSTER_NAME);
+ if (!nodes.contains(instanceConfig.getInstanceName())) {
+ LOG.info(String.format("Adding participant instance %s", instanceConfig));
+ admin.addInstance(CLUSTER_NAME, instanceConfig);
}
success = true;
} catch (HelixException | ZkNoNodeException ex) {
LOG.warn("Helix Cluster not yet setup fully.");
if (i < tries - 1) {
- LOG.info("Waiting for " + sleepTimeInSeconds + " seconds and retrying.");
+ LOG.info(String.format("Waiting for %d seconds and retrying.", sleepTimeInSeconds));
TimeUnit.SECONDS.sleep(sleepTimeInSeconds);
} else {
LOG.error(ex);
@@ -159,34 +151,32 @@ public class MetricCollectorHAController {
}
if (!success) {
- LOG.info("Trying to create " + clusterName + " again since waiting for the creation did not help.");
- admin.addCluster(clusterName, true);
- List<String> nodes = admin.getInstancesInCluster(clusterName);
- if (CollectionUtils.isEmpty(nodes) || !nodes.contains(instanceConfig.getInstanceName())) {
- LOG.info("Adding participant instance " + instanceConfig);
- admin.addInstance(clusterName, instanceConfig);
+ LOG.info(String.format("Trying to create %s again since waiting for the creation did not help.", CLUSTER_NAME));
+ admin.addCluster(CLUSTER_NAME, true);
+ List<String> nodes = admin.getInstancesInCluster(CLUSTER_NAME);
+ if (!nodes.contains(instanceConfig.getInstanceName())) {
+ LOG.info(String.format("Adding participant instance %s", instanceConfig));
+ admin.addInstance(CLUSTER_NAME, instanceConfig);
}
}
- // Add a state model
- if (admin.getStateModelDef(clusterName, DEFAULT_STATE_MODEL) == null) {
+ // Add an ONLINE-OFFLINE state model
+ if (admin.getStateModelDef(CLUSTER_NAME, DEFAULT_STATE_MODEL) == null) {
LOG.info("Adding ONLINE-OFFLINE state model to the cluster");
- admin.addStateModelDef(clusterName, DEFAULT_STATE_MODEL, new StateModelDefinition(
- StateModelConfigGenerator.generateConfigForOnlineOffline()));
+ admin.addStateModelDef(CLUSTER_NAME, DEFAULT_STATE_MODEL, OnlineOfflineSMD.build());
}
// Add resources with 1 cluster-wide replica
// Since our aggregators are unbalanced in terms of work distribution we
// only need to distribute writes to METRIC_AGGREGATE and
- // METRIC_RECORD_MINUTE
- List<String> resources = admin.getResourcesInCluster(clusterName);
+ // METRIC_RECORD_MINUTE, i.e. the Host level and Cluster level aggregations
+ List<String> resources = admin.getResourcesInCluster(CLUSTER_NAME);
if (!resources.contains(METRIC_AGGREGATORS)) {
- LOG.info("Adding resource " + METRIC_AGGREGATORS + " with 2 partitions and 1 replicas");
- admin.addResource(clusterName, METRIC_AGGREGATORS, 2, DEFAULT_STATE_MODEL, FULL_AUTO.toString());
+ LOG.info(String.format("Adding resource %s with %d partitions and %d replicas", METRIC_AGGREGATORS, PARTITION_NUMBER, REPLICATION_FACTOR));
+ admin.addResource(CLUSTER_NAME, METRIC_AGGREGATORS, PARTITION_NUMBER, DEFAULT_STATE_MODEL, FULL_AUTO.toString());
}
- // this will set up the ideal state, it calculates the preference list for
- // each partition similar to consistent hashing
- admin.rebalance(clusterName, METRIC_AGGREGATORS, 1);
+ // This will set up the ideal state, it calculates the preference list for each partition similar to consistent hashing.
+ admin.rebalance(CLUSTER_NAME, METRIC_AGGREGATORS, REPLICATION_FACTOR);
// Start participant
startAggregators();
@@ -194,13 +184,9 @@ public class MetricCollectorHAController {
// Start controller
startController();
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- aggregationTaskRunner.stop();
- manager.disconnect();
- }
- });
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ shutdownHAController();
+ }));
isInitialized = true;
}
@@ -215,24 +201,33 @@ public class MetricCollectorHAController {
private void startAggregators() {
try {
aggregationTaskRunner.initialize();
-
} catch (Exception e) {
LOG.error("Unable to start aggregators.", e);
- throw new MetricsSystemInitializationException(e.getMessage());
+ throw new MetricsSystemInitializationException(e.getMessage(), e);
}
}
private void startController() throws Exception {
- manager = HelixManagerFactory.getZKHelixManager(
- getClusterName(),
- instanceHostname,
- InstanceType.CONTROLLER,
- zkConnectUrl
- );
+ manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, instanceHostname, InstanceType.CONTROLLER, zkConnectUrl);
manager.connect();
- HelixController controller = new HelixController();
- manager.addLiveInstanceChangeListener(controller);
+ manager.addLiveInstanceChangeListener(liveInstanceTracker);
+ }
+
+ public void shutdownHAController() {
+ if (isInitialized) {
+ LOG.info("Shooting down Metrics Collector's HAController.");
+
+ PropertyKey.Builder keyBuilder = new PropertyKey.Builder(CLUSTER_NAME);
+ manager.removeListener(keyBuilder.liveInstances(), liveInstanceTracker);
+ liveInstanceTracker.shutdown();
+ aggregationTaskRunner.stop();
+ manager.disconnect();
+ admin.close();
+
+ isInitialized = false;
+ LOG.info("Shutdown of Metrics Collector's HAController finished.");
+ }
}
public AggregationTaskRunner getAggregationTaskRunner() {
@@ -240,7 +235,7 @@ public class MetricCollectorHAController {
}
public List<String> getLiveInstanceHostNames() {
- List<String> liveInstanceHostNames = new ArrayList<>();
+ List<String> liveInstanceHostNames = new ArrayList<>(2);
for (String instance : liveInstanceNames) {
liveInstanceHostNames.add(instance.split(INSTANCE_NAME_DELIMITER)[0]);
@@ -249,52 +244,44 @@ public class MetricCollectorHAController {
return liveInstanceHostNames;
}
- public class HelixController extends GenericHelixController {
- ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
- Joiner joiner = Joiner.on(", ").skipNulls();
+ public final class LiveInstanceTracker implements LiveInstanceChangeListener {
+ private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+ private final Joiner joiner = Joiner.on(", ").skipNulls();
@Override
public void onLiveInstanceChange(List<LiveInstance> liveInstances, NotificationContext changeContext) {
- super.onLiveInstanceChange(liveInstances, changeContext);
-
liveInstanceNames.clear();
for (LiveInstance instance : liveInstances) {
liveInstanceNames.add(instance.getInstanceName());
}
- LOG.info("Detected change in liveliness of Collector instances. " +
- "LiveIsntances = " + joiner.join(liveInstanceNames));
+ LOG.info(String.format("Detected change in liveliness of Collector instances. LiveInstances = %s", joiner.join(liveInstanceNames)));
// Print HA state - after some delay
- executorService.schedule(new Runnable() {
- @Override
- public void run() {
- printClusterState();
- }
- }, 30, TimeUnit.SECONDS);
-
+ executorService.schedule(() -> printClusterState(), 30, TimeUnit.SECONDS);
+ }
+ public void shutdown() {
+ executorService.shutdown();
}
}
public void printClusterState() {
StringBuilder sb = new StringBuilder("\n######################### Cluster HA state ########################");
- ExternalView resourceExternalView = admin.getResourceExternalView(getClusterName(), METRIC_AGGREGATORS);
+ ExternalView resourceExternalView = admin.getResourceExternalView(CLUSTER_NAME, METRIC_AGGREGATORS);
if (resourceExternalView != null) {
- getPrintableResourceState(resourceExternalView, METRIC_AGGREGATORS, sb);
+ getPrintableResourceState(resourceExternalView, sb);
}
sb.append("\n##################################################");
LOG.info(sb.toString());
}
- private void getPrintableResourceState(ExternalView resourceExternalView,
- String resourceName,
- StringBuilder sb) {
+ private void getPrintableResourceState(ExternalView resourceExternalView, StringBuilder sb) {
TreeSet<String> sortedSet = new TreeSet<>(resourceExternalView.getPartitionSet());
sb.append("\nCLUSTER: ");
- sb.append(getClusterName());
+ sb.append(CLUSTER_NAME);
sb.append("\nRESOURCE: ");
- sb.append(resourceName);
+ sb.append(MetricCollectorHAController.METRIC_AGGREGATORS);
for (String partitionName : sortedSet) {
sb.append("\nPARTITION: ");
sb.append(partitionName).append("\t");
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAControllerTest.java b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAControllerTest.java
index 385a5a1..8b8d60b 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAControllerTest.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAControllerTest.java
@@ -100,7 +100,6 @@ public class MetricCollectorHAControllerTest extends AbstractMiniHBaseClusterTes
// Re-assigned partitions
Assert.assertEquals(2, partitionInstanceMap.size());
- haController.getAggregationTaskRunner().stop();
- haController.manager.disconnect();
+ haController.shutdownHAController();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ambari.apache.org
For additional commands, e-mail: commits-help@ambari.apache.org