You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by wu...@apache.org on 2022/11/17 09:27:53 UTC

[ambari-metrics] branch master updated: AMBARI-25635: Clear Cluster and METRIC_AGGREGATORS MBeans upon shutdown (#68)

This is an automated email from the ASF dual-hosted git repository.

wuzhiguo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ambari-metrics.git


The following commit(s) were added to refs/heads/master by this push:
     new 02af960  AMBARI-25635: Clear Cluster and METRIC_AGGREGATORS MBeans upon shutdown (#68)
02af960 is described below

commit 02af960cbde1bfce3ae4a9e7a78fc249402da8f7
Author: Zhiguo Wu <wu...@apache.org>
AuthorDate: Thu Nov 17 17:27:49 2022 +0800

    AMBARI-25635: Clear Cluster and METRIC_AGGREGATORS MBeans upon shutdown (#68)
---
 .../availability/MetricCollectorHAController.java  | 175 ++++++++++-----------
 .../MetricCollectorHAControllerTest.java           |   3 +-
 2 files changed, 82 insertions(+), 96 deletions(-)

diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAController.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAController.java
index adbe8e7..a27d13a 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAController.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAController.java
@@ -31,7 +31,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -42,51 +41,54 @@ import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
+import org.apache.helix.LiveInstanceChangeListener;
 import org.apache.helix.NotificationContext;
-import org.apache.helix.controller.GenericHelixController;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.OnlineOfflineSMD;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.tools.StateModelConfigGenerator;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 
-;
-
 public class MetricCollectorHAController {
   private static final Log LOG = LogFactory.getLog(MetricCollectorHAController.class);
 
+  @VisibleForTesting
   static final String CLUSTER_NAME = "ambari-metrics-cluster";
+  @VisibleForTesting
   static final String METRIC_AGGREGATORS = "METRIC_AGGREGATORS";
+  @VisibleForTesting
   static final String DEFAULT_STATE_MODEL = OnlineOfflineSMD.name;
-  static final String INSTANCE_NAME_DELIMITER = "_";
+  private static final String INSTANCE_NAME_DELIMITER = "_";
+  private static final int PARTITION_NUMBER = 2;
+  private static final int REPLICATION_FACTOR = 1;
 
+  @VisibleForTesting
   final String zkConnectUrl;
-  final String instanceHostname;
-  final InstanceConfig instanceConfig;
-  final AggregationTaskRunner aggregationTaskRunner;
-  final TimelineMetricConfiguration configuration;
+  private final String instanceHostname;
+  private final InstanceConfig instanceConfig;
+  private final AggregationTaskRunner aggregationTaskRunner;
 
   // Cache list of known live instances
-  final List<String> liveInstanceNames = new ArrayList<>();
+  private final List<String> liveInstanceNames = new ArrayList<>(2);
+  private final LiveInstanceTracker liveInstanceTracker = new LiveInstanceTracker();
 
   // Helix Admin
+  @VisibleForTesting
   HelixAdmin admin;
   // Helix Manager
-  HelixManager manager;
+  private HelixManager manager;
 
   private volatile boolean isInitialized = false;
 
   public MetricCollectorHAController(TimelineMetricConfiguration configuration) {
-    this.configuration = configuration;
     String instancePort;
     try {
       instanceHostname = configuration.getInstanceHostnameFromEnv();
       instancePort = configuration.getInstancePort();
-
     } catch (Exception e) {
       LOG.error("Error reading configs from classpath, will resort to defaults.", e);
       throw new MetricsSystemInitializationException(e.getMessage());
@@ -97,42 +99,32 @@ public class MetricCollectorHAController {
       String zkQuorum = configuration.getClusterZKQuorum();
 
       if (StringUtils.isEmpty(zkClientPort) || StringUtils.isEmpty(zkQuorum)) {
-        throw new Exception("Unable to parse zookeeper quorum. clientPort = "
-          + zkClientPort +", quorum = " + zkQuorum);
+        throw new Exception(String.format("Unable to parse zookeeper quorum. clientPort = %s, quorum = %s", zkClientPort, zkQuorum));
       }
 
       zkConnectUrl = configuration.getZkConnectionUrl(zkClientPort, zkQuorum);
-
     } catch (Exception e) {
       LOG.error("Unable to load hbase-site from classpath.", e);
-      throw new MetricsSystemInitializationException(e.getMessage());
+      throw new MetricsSystemInitializationException(e.getMessage(), e);
     }
 
     instanceConfig = new InstanceConfig(instanceHostname + INSTANCE_NAME_DELIMITER + instancePort);
     instanceConfig.setHostName(instanceHostname);
     instanceConfig.setPort(instancePort);
     instanceConfig.setInstanceEnabled(true);
-    aggregationTaskRunner = new AggregationTaskRunner(
-      instanceConfig.getInstanceName(), zkConnectUrl, getClusterName());
-  }
-
-  /**
-   * Name of Helix znode
-   */
-  public String getClusterName() {
-    return CLUSTER_NAME;
+    aggregationTaskRunner = new AggregationTaskRunner(instanceConfig.getInstanceName(), zkConnectUrl, CLUSTER_NAME);
   }
 
   /**
    * Initialize the instance with zookeeper via Helix
    */
   public void initializeHAController() throws Exception {
-    String clusterName = getClusterName();
+    // Create setup tool instance
     admin = new ZKHelixAdmin(zkConnectUrl);
-    // create cluster
-    LOG.info("Creating zookeeper cluster node: " + clusterName);
-    boolean clusterAdded = admin.addCluster(clusterName, false);
-    LOG.info("Was cluster added successfully? " + clusterAdded);
+    // Create cluster namespace in zookeeper. Don't recreate if exists.
+    LOG.info(String.format("Creating zookeeper cluster node: %s", CLUSTER_NAME));
+    boolean clusterAdded = admin.addCluster(CLUSTER_NAME, false);
+    LOG.info(String.format("Was cluster added successfully? %s", clusterAdded));
 
     // Adding host to the cluster
     boolean success = false;
@@ -141,16 +133,16 @@ public class MetricCollectorHAController {
 
     for (int i = 0; i < tries && !success; i++) {
       try {
-        List<String> nodes = admin.getInstancesInCluster(clusterName);
-        if (CollectionUtils.isEmpty(nodes) || !nodes.contains(instanceConfig.getInstanceName())) {
-          LOG.info("Adding participant instance " + instanceConfig);
-          admin.addInstance(clusterName, instanceConfig);
+        List<String> nodes = admin.getInstancesInCluster(CLUSTER_NAME);
+        if (!nodes.contains(instanceConfig.getInstanceName())) {
+          LOG.info(String.format("Adding participant instance %s", instanceConfig));
+          admin.addInstance(CLUSTER_NAME, instanceConfig);
         }
         success = true;
       } catch (HelixException | ZkNoNodeException ex) {
         LOG.warn("Helix Cluster not yet setup fully.");
         if (i < tries - 1) {
-          LOG.info("Waiting for " + sleepTimeInSeconds + " seconds and retrying.");
+          LOG.info(String.format("Waiting for %d seconds and retrying.", sleepTimeInSeconds));
           TimeUnit.SECONDS.sleep(sleepTimeInSeconds);
         } else {
           LOG.error(ex);
@@ -159,34 +151,32 @@ public class MetricCollectorHAController {
     }
 
     if (!success) {
-      LOG.info("Trying to create " + clusterName + " again since waiting for the creation did not help.");
-      admin.addCluster(clusterName, true);
-      List<String> nodes = admin.getInstancesInCluster(clusterName);
-      if (CollectionUtils.isEmpty(nodes) || !nodes.contains(instanceConfig.getInstanceName())) {
-        LOG.info("Adding participant instance " + instanceConfig);
-        admin.addInstance(clusterName, instanceConfig);
+      LOG.info(String.format("Trying to create %s again since waiting for the creation did not help.", CLUSTER_NAME));
+      admin.addCluster(CLUSTER_NAME, true);
+      List<String> nodes = admin.getInstancesInCluster(CLUSTER_NAME);
+      if (!nodes.contains(instanceConfig.getInstanceName())) {
+        LOG.info(String.format("Adding participant instance %s", instanceConfig));
+        admin.addInstance(CLUSTER_NAME, instanceConfig);
       }
     }
 
-    // Add a state model
-    if (admin.getStateModelDef(clusterName, DEFAULT_STATE_MODEL) == null) {
+    // Add an ONLINE-OFFLINE state model
+    if (admin.getStateModelDef(CLUSTER_NAME, DEFAULT_STATE_MODEL) == null) {
       LOG.info("Adding ONLINE-OFFLINE state model to the cluster");
-      admin.addStateModelDef(clusterName, DEFAULT_STATE_MODEL, new StateModelDefinition(
-        StateModelConfigGenerator.generateConfigForOnlineOffline()));
+      admin.addStateModelDef(CLUSTER_NAME, DEFAULT_STATE_MODEL, OnlineOfflineSMD.build());
     }
 
     // Add resources with 1 cluster-wide replica
     // Since our aggregators are unbalanced in terms of work distribution we
     // only need to distribute writes to METRIC_AGGREGATE and
-    // METRIC_RECORD_MINUTE
-    List<String> resources = admin.getResourcesInCluster(clusterName);
+    // METRIC_RECORD_MINUTE, i.e. the Host level and Cluster level aggregations
+    List<String> resources = admin.getResourcesInCluster(CLUSTER_NAME);
     if (!resources.contains(METRIC_AGGREGATORS)) {
-      LOG.info("Adding resource " + METRIC_AGGREGATORS + " with 2 partitions and 1 replicas");
-      admin.addResource(clusterName, METRIC_AGGREGATORS, 2, DEFAULT_STATE_MODEL, FULL_AUTO.toString());
+      LOG.info(String.format("Adding resource %s with %d partitions and %d replicas", METRIC_AGGREGATORS, PARTITION_NUMBER, REPLICATION_FACTOR));
+      admin.addResource(CLUSTER_NAME, METRIC_AGGREGATORS, PARTITION_NUMBER, DEFAULT_STATE_MODEL, FULL_AUTO.toString());
     }
-    // this will set up the ideal state, it calculates the preference list for
-    // each partition similar to consistent hashing
-    admin.rebalance(clusterName, METRIC_AGGREGATORS, 1);
+    // This will set up the ideal state, it calculates the preference list for each partition similar to consistent hashing.
+    admin.rebalance(CLUSTER_NAME, METRIC_AGGREGATORS, REPLICATION_FACTOR);
 
     // Start participant
     startAggregators();
@@ -194,13 +184,9 @@ public class MetricCollectorHAController {
     // Start controller
     startController();
 
-    Runtime.getRuntime().addShutdownHook(new Thread() {
-      @Override
-      public void run() {
-        aggregationTaskRunner.stop();
-        manager.disconnect();
-      }
-    });
+    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+      shutdownHAController();
+    }));
 
     isInitialized = true;
   }
@@ -215,24 +201,33 @@ public class MetricCollectorHAController {
   private void startAggregators() {
     try {
       aggregationTaskRunner.initialize();
-
     } catch (Exception e) {
       LOG.error("Unable to start aggregators.", e);
-      throw new MetricsSystemInitializationException(e.getMessage());
+      throw new MetricsSystemInitializationException(e.getMessage(), e);
     }
   }
 
   private void startController() throws Exception {
-    manager = HelixManagerFactory.getZKHelixManager(
-      getClusterName(),
-      instanceHostname,
-      InstanceType.CONTROLLER,
-      zkConnectUrl
-    );
+    manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, instanceHostname, InstanceType.CONTROLLER, zkConnectUrl);
 
     manager.connect();
-    HelixController controller = new HelixController();
-    manager.addLiveInstanceChangeListener(controller);
+    manager.addLiveInstanceChangeListener(liveInstanceTracker);
+  }
+
+  public void shutdownHAController() {
+    if (isInitialized) {
+      LOG.info("Shooting down Metrics Collector's HAController.");
+
+      PropertyKey.Builder keyBuilder = new PropertyKey.Builder(CLUSTER_NAME);
+      manager.removeListener(keyBuilder.liveInstances(), liveInstanceTracker);
+      liveInstanceTracker.shutdown();
+      aggregationTaskRunner.stop();
+      manager.disconnect();
+      admin.close();
+
+      isInitialized = false;
+      LOG.info("Shutdown of Metrics Collector's HAController finished.");
+    }
   }
 
   public AggregationTaskRunner getAggregationTaskRunner() {
@@ -240,7 +235,7 @@ public class MetricCollectorHAController {
   }
 
   public List<String> getLiveInstanceHostNames() {
-    List<String> liveInstanceHostNames = new ArrayList<>();
+    List<String> liveInstanceHostNames = new ArrayList<>(2);
 
     for (String instance : liveInstanceNames) {
       liveInstanceHostNames.add(instance.split(INSTANCE_NAME_DELIMITER)[0]);
@@ -249,52 +244,44 @@ public class MetricCollectorHAController {
     return liveInstanceHostNames;
   }
 
-  public class HelixController extends GenericHelixController {
-    ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
-    Joiner joiner = Joiner.on(", ").skipNulls();
+  public final class LiveInstanceTracker implements LiveInstanceChangeListener {
+    private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+    private final Joiner joiner = Joiner.on(", ").skipNulls();
 
     @Override
     public void onLiveInstanceChange(List<LiveInstance> liveInstances, NotificationContext changeContext) {
-      super.onLiveInstanceChange(liveInstances, changeContext);
-
       liveInstanceNames.clear();
       for (LiveInstance instance : liveInstances) {
         liveInstanceNames.add(instance.getInstanceName());
       }
 
-      LOG.info("Detected change in liveliness of Collector instances. " +
-        "LiveIsntances = " + joiner.join(liveInstanceNames));
+      LOG.info(String.format("Detected change in liveliness of Collector instances. LiveInstances = %s", joiner.join(liveInstanceNames)));
       // Print HA state - after some delay
-      executorService.schedule(new Runnable() {
-        @Override
-        public void run() {
-          printClusterState();
-        }
-      }, 30, TimeUnit.SECONDS);
-
+      executorService.schedule(() -> printClusterState(), 30, TimeUnit.SECONDS);
+    }
 
+    public void shutdown() {
+      executorService.shutdown();
     }
   }
 
   public void printClusterState() {
     StringBuilder sb = new StringBuilder("\n######################### Cluster HA state ########################");
 
-    ExternalView resourceExternalView = admin.getResourceExternalView(getClusterName(), METRIC_AGGREGATORS);
+    ExternalView resourceExternalView = admin.getResourceExternalView(CLUSTER_NAME, METRIC_AGGREGATORS);
     if (resourceExternalView != null) {
-      getPrintableResourceState(resourceExternalView, METRIC_AGGREGATORS, sb);
+      getPrintableResourceState(resourceExternalView, sb);
     }
     sb.append("\n##################################################");
     LOG.info(sb.toString());
   }
 
-  private void getPrintableResourceState(ExternalView resourceExternalView,
-                                         String resourceName,
-                                         StringBuilder sb) {
+  private void getPrintableResourceState(ExternalView resourceExternalView, StringBuilder sb) {
     TreeSet<String> sortedSet = new TreeSet<>(resourceExternalView.getPartitionSet());
     sb.append("\nCLUSTER: ");
-    sb.append(getClusterName());
+    sb.append(CLUSTER_NAME);
     sb.append("\nRESOURCE: ");
-    sb.append(resourceName);
+    sb.append(MetricCollectorHAController.METRIC_AGGREGATORS);
     for (String partitionName : sortedSet) {
       sb.append("\nPARTITION: ");
       sb.append(partitionName).append("\t");
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAControllerTest.java b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAControllerTest.java
index 385a5a1..8b8d60b 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAControllerTest.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAControllerTest.java
@@ -100,7 +100,6 @@ public class MetricCollectorHAControllerTest extends AbstractMiniHBaseClusterTes
     // Re-assigned partitions
     Assert.assertEquals(2, partitionInstanceMap.size());
 
-    haController.getAggregationTaskRunner().stop();
-    haController.manager.disconnect();
+    haController.shutdownHAController();
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ambari.apache.org
For additional commands, e-mail: commits-help@ambari.apache.org