You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2020/04/07 01:20:36 UTC

[helix] 07/12: Modify participant manager to add cluster auto registration logic (#695)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit c2f0dc3e6007762d55ee31c713354ef3d21222c1
Author: zhangmeng916 <56...@users.noreply.github.com>
AuthorDate: Fri Feb 7 15:41:36 2020 -0800

    Modify participant manager to add cluster auto registration logic (#695)
    
    Modify participant logic to add logic for instance auto registration to a Helix cluster.
    Auto registration differs with existing auto join in that auto registration will retrieve the fault domain information for the instance and populate it under instanceConfig. With auto registration on, user does not need to manually populate instance information in Zookeeper.
---
 .../helix/manager/zk/ParticipantManager.java       | 99 +++++++++++++++++-----
 .../main/java/org/apache/helix/util/HelixUtil.java | 19 +++++
 .../manager/MockParticipantManager.java            | 10 ++-
 3 files changed, 106 insertions(+), 22 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
index a9b5bde..8c7e8f4 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
@@ -20,6 +20,8 @@ package org.apache.helix.manager.zk;
  */
 
 import java.lang.management.ManagementFactory;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -29,6 +31,7 @@ import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixCloudProperty;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerProperty;
@@ -36,6 +39,8 @@ import org.apache.helix.InstanceType;
 import org.apache.helix.LiveInstanceInfoProvider;
 import org.apache.helix.PreConnectCallback;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.api.cloud.CloudInstanceInformation;
+import org.apache.helix.api.cloud.CloudInstanceInformationProcessor;
 import org.apache.helix.messaging.DefaultMessagingService;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.HelixConfigScope;
@@ -47,6 +52,7 @@ import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory;
+import org.apache.helix.util.HelixUtil;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.datamodel.ZNRecordBucketizer;
@@ -62,6 +68,7 @@ import org.slf4j.LoggerFactory;
  */
 public class ParticipantManager {
   private static Logger LOG = LoggerFactory.getLogger(ParticipantManager.class);
+  private static final String CLOUD_PROCESSOR_PATH_PREFIX = "org.apache.helix.cloud.";
 
   final HelixZkClient _zkclient;
   final HelixManager _manager;
@@ -117,7 +124,6 @@ public class ParticipantManager {
    * Handles a new session for a participant. The new session's id is passed in when participant
    * manager is created, as it is required to prevent ephemeral node creation from session race
    * condition: ephemeral node is created by an expired or unexpected session.
-   *
    * @throws Exception
    */
   public void handleNewSession() throws Exception {
@@ -154,18 +160,31 @@ public class ParticipantManager {
   }
 
   private void joinCluster() {
-    // Read cluster config and see if instance can auto join the cluster
+    // Read cluster config and see if an instance can auto join or auto register to the cluster
     boolean autoJoin = false;
+    boolean autoRegistration = false;
+
+    // Read "allowParticipantAutoJoin" flag to see if an instance can auto join to the cluster
     try {
-      HelixConfigScope scope =
-          new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(
-              _manager.getClusterName()).build();
-      autoJoin =
-          Boolean.parseBoolean(_configAccessor.get(scope,
-              ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN));
+      HelixConfigScope scope = new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER)
+          .forCluster(_manager.getClusterName()).build();
+      autoJoin = Boolean
+          .parseBoolean(_configAccessor.get(scope, ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN));
       LOG.info("instance: " + _instanceName + " auto-joining " + _clusterName + " is " + autoJoin);
     } catch (Exception e) {
-      // autoJoin is false
+      LOG.info("auto join is false for cluster" + _clusterName);
+    }
+
+    // Read cloud config and see if an instance can auto register to the cluster
+    // Difference between auto join and auto registration is that the latter will also populate the
+    // domain information in instance config
+    try {
+      autoRegistration =
+          Boolean.valueOf(_helixManagerProperty.getHelixCloudProperty().getCloudEnabled());
+      LOG.info("instance: " + _instanceName + " auto-register " + _clusterName + " is "
+          + autoRegistration);
+    } catch (Exception e) {
+      LOG.info("auto registration is false for cluster" + _clusterName);
     }
 
     if (!ZKUtil.isInstanceSetup(_zkclient, _clusterName, _instanceName, _instanceType)) {
@@ -173,23 +192,61 @@ public class ParticipantManager {
         throw new HelixException("Initial cluster structure is not set up for instance: "
             + _instanceName + ", instanceType: " + _instanceType);
       } else {
-        LOG.info(_instanceName + " is auto-joining cluster: " + _clusterName);
-        InstanceConfig instanceConfig = new InstanceConfig(_instanceName);
-        String hostName = _instanceName;
-        String port = "";
-        int lastPos = _instanceName.lastIndexOf("_");
-        if (lastPos > 0) {
-          hostName = _instanceName.substring(0, lastPos);
-          port = _instanceName.substring(lastPos + 1);
+        if (!autoRegistration) {
+          LOG.info(_instanceName + " is auto-joining cluster: " + _clusterName);
+          _helixAdmin.addInstance(_clusterName, HelixUtil.composeInstanceConfig(_instanceName));
+        } else {
+          LOG.info(_instanceName + " is auto-registering cluster: " + _clusterName);
+          CloudInstanceInformation cloudInstanceInformation = getCloudInstanceInformation();
+          String domain = cloudInstanceInformation
+              .get(CloudInstanceInformation.CloudInstanceField.FAULT_DOMAIN.name());
+
+          // Disable the verification for now
+          /*String cloudIdInRemote = cloudInstanceInformation
+              .get(CloudInstanceInformation.CloudInstanceField.INSTANCE_SET_NAME.name());
+          String cloudIdInConfig = _configAccessor.getCloudConfig(_clusterName).getCloudID();
+
+          // validate that the instance is auto registering to the correct cluster
+          if (!cloudIdInRemote.equals(cloudIdInConfig)) {
+            throw new IllegalArgumentException(String.format(
+                "cloudId in config: %s is not consistent with cloudId from remote: %s. The instance is auto registering to a wrong cluster.",
+                cloudIdInConfig, cloudIdInRemote));
+          }*/
+
+          InstanceConfig instanceConfig = HelixUtil.composeInstanceConfig(_instanceName);
+          instanceConfig.setDomain(domain);
+          _helixAdmin.addInstance(_clusterName, instanceConfig);
         }
-        instanceConfig.setHostName(hostName);
-        instanceConfig.setPort(port);
-        instanceConfig.setInstanceEnabled(true);
-        _helixAdmin.addInstance(_clusterName, instanceConfig);
       }
     }
   }
 
+  private CloudInstanceInformation getCloudInstanceInformation() {
+    String cloudInstanceInformationProcessorName =
+        _helixManagerProperty.getHelixCloudProperty().getCloudInfoProcessorName();
+    try {
+      // fetch cloud instance information for the instance
+      String cloudInstanceInformationProcessorClassName = CLOUD_PROCESSOR_PATH_PREFIX
+          + _helixManagerProperty.getHelixCloudProperty().getCloudProvider().toLowerCase() + "."
+          + cloudInstanceInformationProcessorName;
+      Class processorClass = Class.forName(cloudInstanceInformationProcessorClassName);
+      Constructor constructor = processorClass.getConstructor(HelixCloudProperty.class);
+      CloudInstanceInformationProcessor processor = (CloudInstanceInformationProcessor) constructor
+          .newInstance(_helixManagerProperty.getHelixCloudProperty());
+      List<String> responses = processor.fetchCloudInstanceInformation();
+
+      // parse cloud instance information for the participant
+      CloudInstanceInformation cloudInstanceInformation =
+          processor.parseCloudInstanceInformation(responses);
+      return cloudInstanceInformation;
+    } catch (ClassNotFoundException | NoSuchMethodException | InstantiationException
+        | IllegalAccessException | InvocationTargetException ex) {
+      throw new HelixException(
+          "Failed to create a new instance for the class: " + cloudInstanceInformationProcessorName,
+          ex);
+    }
+  }
+
   private void createLiveInstance() {
     String liveInstancePath = _keyBuilder.liveInstance(_instanceName).getPath();
     LiveInstance liveInstance = new LiveInstance(_instanceName);
diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
index bfda60e..3480fb6 100644
--- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -309,4 +309,23 @@ public final class HelixUtil {
     return propertyDefaultValue;
   }
 
+  /**
+   * Compose the config for an instance
+   * @param instanceName
+   * @return InstanceConfig
+   */
+  public static InstanceConfig composeInstanceConfig(String instanceName) {
+    InstanceConfig instanceConfig = new InstanceConfig(instanceName);
+    String hostName = instanceName;
+    String port = "";
+    int lastPos = instanceName.lastIndexOf("_");
+    if (lastPos > 0) {
+      hostName = instanceName.substring(0, lastPos);
+      port = instanceName.substring(lastPos + 1);
+    }
+    instanceConfig.setHostName(hostName);
+    instanceConfig.setPort(port);
+    instanceConfig.setInstanceEnabled(true);
+    return instanceConfig;
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
index 0036e0d..4f1b966 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
@@ -22,6 +22,7 @@ package org.apache.helix.integration.manager;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 
+import org.apache.helix.HelixCloudProperty;
 import org.apache.helix.InstanceType;
 import org.apache.helix.manager.zk.CallbackHandler;
 import org.apache.helix.manager.zk.ZKHelixManager;
@@ -48,6 +49,7 @@ public class MockParticipantManager extends ZKHelixManager implements Runnable,
   protected MockMSModelFactory _msModelFactory;
   protected DummyLeaderStandbyStateModelFactory _lsModelFactory;
   protected DummyOnlineOfflineStateModelFactory _ofModelFactory;
+  protected HelixCloudProperty _helixCloudProperty;
 
   public MockParticipantManager(String zkAddr, String clusterName, String instanceName) {
     this(zkAddr, clusterName, instanceName, 10);
@@ -55,11 +57,17 @@ public class MockParticipantManager extends ZKHelixManager implements Runnable,
 
   public MockParticipantManager(String zkAddr, String clusterName, String instanceName,
       int transDelay) {
+    this(zkAddr, clusterName, instanceName, transDelay, null);
+  }
+
+  public MockParticipantManager(String zkAddr, String clusterName, String instanceName,
+      int transDelay, HelixCloudProperty helixCloudProperty) {
     super(clusterName, instanceName, InstanceType.PARTICIPANT, zkAddr);
     _transDelay = transDelay;
     _msModelFactory = new MockMSModelFactory(null);
     _lsModelFactory = new DummyLeaderStandbyStateModelFactory(_transDelay);
     _ofModelFactory = new DummyOnlineOfflineStateModelFactory(_transDelay);
+    _helixCloudProperty = helixCloudProperty;
   }
 
   public void setTransition(MockTransition transition) {
@@ -136,4 +144,4 @@ public class MockParticipantManager extends ZKHelixManager implements Runnable,
   public List<CallbackHandler> getHandlers() {
     return _handlers;
   }
-}
+}
\ No newline at end of file