You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2020/01/06 23:24:36 UTC

[helix] branch helix-cloud updated: Add java API to create cluster with CloudConfig

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch helix-cloud
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/helix-cloud by this push:
     new 6f869e8  Add java API to create cluster with CloudConfig
6f869e8 is described below

commit 6f869e873c8f248e7b678b2b9053aa6973cf0f4e
Author: Ali Reza Zamani Zadeh Najari <an...@linkedin.com>
AuthorDate: Tue Nov 26 15:26:10 2019 -0800

    Add java API to create cluster with CloudConfig
    
    In this commit the below APIs have been added.
    1- AddCluster with CloudCOnfig API.
    2- addCloudConfig to existing cluster.
    3- Update CloudConfig to update existing Cloud Config.
    Several tests have been added to test these APIs.
---
 .../main/java/org/apache/helix/ConfigAccessor.java |   4 +-
 .../src/main/java/org/apache/helix/HelixAdmin.java |  15 ++-
 .../org/apache/helix/manager/zk/ZKHelixAdmin.java  |  28 ++++
 .../java/org/apache/helix/model/CloudConfig.java   | 149 +++++++--------------
 .../java/org/apache/helix/tools/ClusterSetup.java  |  17 ++-
 .../java/org/apache/helix/TestConfigAccessor.java  |  66 +++++++--
 .../apache/helix/manager/zk/TestZkHelixAdmin.java  |  69 ++++++++++
 .../java/org/apache/helix/mock/MockHelixAdmin.java |  11 ++
 .../apache/helix/model/cloud/TestCloudConfig.java  |  60 ++++++---
 .../org/apache/helix/tools/TestClusterSetup.java   |  88 +++++++++++-
 10 files changed, 368 insertions(+), 139 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
index 264fd3f..97bfb34 100644
--- a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
@@ -591,7 +591,7 @@ public class ConfigAccessor {
    * @return The instance of {@link CloudConfig}
    */
   public CloudConfig getCloudConfig(String clusterName) {
-    if (!ZKUtil.isClusterSetup(clusterName, zkClient)) {
+    if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) {
       throw new HelixException(
           String.format("Failed to get config. cluster: %s is not setup.", clusterName));
     }
@@ -604,7 +604,7 @@ public class ConfigAccessor {
       return null;
     }
 
-    return new CloudConfig(record);
+    return new CloudConfig.Builder(record).build();
   }
 
   /**
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index a11b235..c3dc8b5 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -22,7 +22,7 @@ package org.apache.helix;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
-
+import org.apache.helix.model.CloudConfig;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.ConstraintItem;
@@ -381,6 +381,19 @@ public interface HelixAdmin {
   void dropResource(String clusterName, String resourceName);
 
   /**
+   * Add cloud config to the cluster.
+   * @param clusterName
+   * @param cloudConfig
+   */
+  void addCloudConfig(String clusterName, CloudConfig cloudConfig);
+
+  /**
+   * Remove the Cloud Config for specific cluster
+   * @param clusterName
+   */
+  void removeCloudConfig(String clusterName);
+
+  /**
    * Get a list of state model definitions in a cluster
    * @param clusterName
    * @return
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 0a978e5..9d8d406 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -59,6 +59,7 @@ import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
 import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.manager.zk.client.SharedZkClientFactory;
+import org.apache.helix.model.CloudConfig;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
@@ -1026,6 +1027,33 @@ public class ZKHelixAdmin implements HelixAdmin {
   }
 
   @Override
+  public void addCloudConfig(String clusterName, CloudConfig cloudConfig) {
+    logger.info("Add CloudConfig to cluster {}, CloudConfig is {}.", clusterName,
+        cloudConfig.toString());
+
+    if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) {
+      throw new HelixException("cluster " + clusterName + " is not setup yet");
+    }
+
+    CloudConfig.Builder builder = new CloudConfig.Builder(cloudConfig);
+    CloudConfig cloudConfigBuilder = builder.build();
+
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+    accessor.setProperty(keyBuilder.cloudConfig(), cloudConfigBuilder);
+  }
+
+  @Override
+  public void removeCloudConfig(String clusterName) {
+    logger.info("Remove Cloud Config for cluster {}.", clusterName);
+    HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+    accessor.removeProperty(keyBuilder.cloudConfig());
+  }
+
+  @Override
   public List<String> getStateModelDefs(String clusterName) {
     return _zkClient.getChildren(PropertyPathBuilder.stateModelDef(clusterName));
   }
diff --git a/helix-core/src/main/java/org/apache/helix/model/CloudConfig.java b/helix-core/src/main/java/org/apache/helix/model/CloudConfig.java
index c8ab6eb..f6c279c 100644
--- a/helix-core/src/main/java/org/apache/helix/model/CloudConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/CloudConfig.java
@@ -30,6 +30,9 @@ import org.apache.helix.cloud.constants.CloudProvider;
  * Cloud configurations
  */
 public class CloudConfig extends HelixProperty {
+
+  public static final String CLOUD_CONFIG_KW = "CloudConfig";
+
   /**
    * Configurable characteristics of a cloud.
    * NOTE: Do NOT use this field name directly, use its corresponding getter/setter in the
@@ -52,39 +55,22 @@ public class CloudConfig extends HelixProperty {
 
   /**
    * Instantiate the CloudConfig for the cloud
-   * @param cluster
    */
-  public CloudConfig(String cluster) {
-    super(cluster);
+  private CloudConfig() {
+    super(CLOUD_CONFIG_KW);
   }
 
   /**
-   * Instantiate with a pre-populated record
-   * @param record a ZNRecord corresponding to a cloud configuration
+   * The constructor from the ZNRecord.
+   * @param record
    */
-  public CloudConfig(ZNRecord record) {
-    super(record);
+  private CloudConfig(ZNRecord record) {
+    super(CLOUD_CONFIG_KW);
+    _record.setSimpleFields(record.getSimpleFields());
+    _record.setListFields(record.getListFields());
+    _record.setMapFields(record.getMapFields());
   }
 
-  /**
-   * Instantiate the config using each field individually.
-   * Users should use CloudConfig.Builder to create CloudConfig.
-   * @param cluster
-   * @param enabled
-   * @param cloudID
-   */
-  public CloudConfig(String cluster, boolean enabled, CloudProvider cloudProvider, String cloudID,
-      List<String> cloudInfoSource, String cloudProcessorName) {
-    super(cluster);
-    _record.setBooleanField(CloudConfigProperty.CLOUD_ENABLED.name(), enabled);
-    _record.setSimpleField(CloudConfigProperty.CLOUD_PROVIDER.name(), cloudProvider.name());
-    _record.setSimpleField(CloudConfigProperty.CLOUD_ID.name(), cloudID);
-    if (cloudProvider.equals(CloudProvider.CUSTOMIZED)) {
-      _record
-          .setSimpleField(CloudConfigProperty.CLOUD_INFO_PROCESSOR_NAME.name(), cloudProcessorName);
-      _record.setListField(CloudConfigProperty.CLOUD_INFO_SOURCE.name(), cloudInfoSource);
-    }
-  }
 
   /**
    * Enable/Disable the CLOUD_ENABLED field.
@@ -135,15 +121,6 @@ public class CloudConfig extends HelixProperty {
   }
 
   /**
-   * Set the CLOUD_INFO_PROCESSOR_NAME field.
-   * @param cloudInfoProcessorName
-   */
-  public void setCloudInfoFProcessorName(String cloudInfoProcessorName) {
-    _record.setSimpleField(CloudConfigProperty.CLOUD_INFO_PROCESSOR_NAME.name(),
-        cloudInfoProcessorName);
-  }
-
-  /**
    * Get the CLOUD_INFO_PROCESSOR_NAME field.
    * @return CLOUD_INFO_PROCESSOR_NAME field.
    */
@@ -152,14 +129,6 @@ public class CloudConfig extends HelixProperty {
   }
 
   /**
-   * Set the CLOUD_PROVIDER field.
-   * @param cloudProvider
-   */
-  public void setCloudProvider(CloudProvider cloudProvider) {
-    _record.setSimpleField(CloudConfigProperty.CLOUD_PROVIDER.name(), cloudProvider.name());
-  }
-
-  /**
    * Get the CLOUD_PROVIDER field.
    * @return CLOUD_PROVIDER field.
    */
@@ -167,32 +136,28 @@ public class CloudConfig extends HelixProperty {
     return _record.getSimpleField(CloudConfigProperty.CLOUD_PROVIDER.name());
   }
 
+
   public static class Builder {
-    private String _clusterName = null;
-    private CloudProvider _cloudProvider;
-    private boolean _cloudEnabled = DEFAULT_CLOUD_ENABLED;
-    private String _cloudID;
-    private List<String> _cloudInfoSources;
-    private String _cloudInfoProcessorName;
+    private ZNRecord _record;
 
     public CloudConfig build() {
       validate();
-      return new CloudConfig(_clusterName, _cloudEnabled, _cloudProvider, _cloudID,
-          _cloudInfoSources, _cloudInfoProcessorName);
+      return new CloudConfig(_record);
     }
 
     /**
      * Default constructor
      */
     public Builder() {
+      _record = new ZNRecord(CLOUD_CONFIG_KW);
     }
 
     /**
-     * Constructor with Cluster Name as input
-     * @param clusterName
+     * Instantiate with a pre-populated record
+     * @param record a ZNRecord corresponding to a cloud configuration
      */
-    public Builder(String clusterName) {
-      _clusterName = clusterName;
+    public Builder(ZNRecord record) {
+      _record = record;
     }
 
     /**
@@ -200,89 +165,75 @@ public class CloudConfig extends HelixProperty {
      * @param cloudConfig
      */
     public Builder(CloudConfig cloudConfig) {
-      _cloudEnabled = cloudConfig.isCloudEnabled();
-      _cloudProvider = CloudProvider.valueOf(cloudConfig.getCloudProvider());
-      _cloudID = cloudConfig.getCloudID();
-      _cloudInfoSources = cloudConfig.getCloudInfoSources();
-      _cloudInfoProcessorName = cloudConfig.getCloudInfoProcessorName();
-    }
-
-    public Builder setClusterName(String v) {
-      _clusterName = v;
-      return this;
+      _record = cloudConfig.getRecord();
     }
 
     public Builder setCloudEnabled(boolean isEnabled) {
-      _cloudEnabled = isEnabled;
+      _record.setBooleanField(CloudConfigProperty.CLOUD_ENABLED.name(), isEnabled);
       return this;
     }
 
     public Builder setCloudProvider(CloudProvider cloudProvider) {
-      _cloudProvider = cloudProvider;
+      _record.setSimpleField(CloudConfigProperty.CLOUD_PROVIDER.name(), cloudProvider.name());
       return this;
     }
 
-    public Builder setCloudID(String v) {
-      _cloudID = v;
+    public Builder setCloudID(String cloudID) {
+      _record.setSimpleField(CloudConfigProperty.CLOUD_ID.name(), cloudID);
       return this;
     }
 
-    public Builder setCloudInfoSources(List<String> v) {
-      _cloudInfoSources = v;
+    public Builder setCloudInfoSources(List<String> cloudInfoSources) {
+      _record.setListField(CloudConfigProperty.CLOUD_INFO_SOURCE.name(), cloudInfoSources);
       return this;
     }
 
-    public Builder addCloudInfoSource(String v) {
-      if (_cloudInfoSources == null) {
-        _cloudInfoSources = new ArrayList<String>();
+    public Builder addCloudInfoSource(String cloudInfoSource) {
+      if (_record.getListField(CloudConfigProperty.CLOUD_INFO_SOURCE.name()) == null) {
+        _record.setListField(CloudConfigProperty.CLOUD_INFO_SOURCE.name(), new ArrayList<String>());
       }
-      _cloudInfoSources.add(v);
+      List<String> cloudInfoSourcesList = _record.getListField(CloudConfigProperty.CLOUD_INFO_SOURCE.name());
+      cloudInfoSourcesList.add(cloudInfoSource);
+      _record.setListField(CloudConfigProperty.CLOUD_INFO_SOURCE.name(), cloudInfoSourcesList);
       return this;
     }
 
-    public Builder setCloudInfoProcessorName(String v) {
-      _cloudInfoProcessorName = v;
+    public Builder setCloudInfoProcessorName(String cloudInfoProcessorName) {
+      _record.setSimpleField(CloudConfigProperty.CLOUD_INFO_PROCESSOR_NAME.name(),
+          cloudInfoProcessorName);
       return this;
     }
 
-    public String getClusterName() {
-      return _clusterName;
-    }
-
-    public CloudProvider getCloudProvider() {
-      return _cloudProvider;
+    public String getCloudProvider() {
+      return _record.getSimpleField(CloudConfigProperty.CLOUD_PROVIDER.name());
     }
 
     public boolean getCloudEnabled() {
-      return _cloudEnabled;
+      return _record.getBooleanField(CloudConfigProperty.CLOUD_ENABLED.name(),
+          DEFAULT_CLOUD_ENABLED);
     }
 
     public String getCloudID() {
-      return _cloudID;
+      return _record.getSimpleField(CloudConfigProperty.CLOUD_ID.name());
     }
 
     public List<String> getCloudInfoSources() {
-      return _cloudInfoSources;
+      return _record.getListField(CloudConfigProperty.CLOUD_INFO_SOURCE.name());
     }
 
     public String getCloudInfoProcessorName() {
-      return _cloudInfoProcessorName;
+      return _record.getSimpleField(CloudConfigProperty.CLOUD_INFO_PROCESSOR_NAME.name());
     }
 
     private void validate() {
-      if (_cloudEnabled) {
-        if (_cloudID == null) {
-          throw new HelixException(
-              "This Cloud Configuration is Invalid. The CloudID is missing from the config.");
-        }
-        if (_cloudProvider == null) {
+      if (this.getCloudProvider() == null) {
+        throw new HelixException(
+            "This Cloud Configuration is Invalid. The Cloud Provider is missing from the config.");
+      } else if (this.getCloudProvider().equals(CloudProvider.CUSTOMIZED.name())) {
+        if (this.getCloudInfoProcessorName() == null || this.getCloudInfoSources() == null
+            || this.getCloudInfoSources().size() == 0) {
           throw new HelixException(
-              "This Cloud Configuration is Invalid. The Cloud Provider is missing from the config.");
-        } else if (_cloudProvider == CloudProvider.CUSTOMIZED) {
-          if (_cloudInfoProcessorName == null || _cloudInfoSources == null || _cloudInfoSources.size() == 0) {
-            throw new HelixException(
-                "This Cloud Configuration is Invalid. CUSTOMIZED provider has been chosen without defining CloudInfoProcessorName or CloudInfoSources");
-          }
+              "This Cloud Configuration is Invalid. CUSTOMIZED provider has been chosen without defining CloudInfoProcessorName or CloudInfoSources");
         }
       }
     }
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index 5d5f864..d1dc6d3 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -41,7 +41,9 @@ import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixException;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.cloud.constants.CloudProvider;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
@@ -49,6 +51,7 @@ import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.CloudConfig;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
@@ -176,15 +179,23 @@ public class ClusterSetup {
     }
   }
 
-  public void addCluster(String clusterName, boolean overwritePrevious) {
+  public void addCluster(String clusterName, boolean overwritePrevious, CloudConfig cloudConfig)
+      throws HelixException {
     _admin.addCluster(clusterName, overwritePrevious);
-
     for (BuiltInStateModelDefinitions def : BuiltInStateModelDefinitions.values()) {
       addStateModelDef(clusterName, def.getStateModelDefinition().getId(),
-                       def.getStateModelDefinition(), overwritePrevious);
+          def.getStateModelDefinition(), overwritePrevious);
+    }
+
+    if (cloudConfig != null) {
+      _admin.addCloudConfig(clusterName, cloudConfig);
     }
   }
 
+  public void addCluster(String clusterName, boolean overwritePrevious) {
+    addCluster(clusterName, overwritePrevious, null);
+  }
+
   public void activateCluster(String clusterName, String grandCluster, boolean enable) {
     if (enable) {
       _admin.addClusterToGrandCluster(clusterName, grandCluster);
diff --git a/helix-core/src/test/java/org/apache/helix/TestConfigAccessor.java b/helix-core/src/test/java/org/apache/helix/TestConfigAccessor.java
index 0fa05c3..e62bf81 100644
--- a/helix-core/src/test/java/org/apache/helix/TestConfigAccessor.java
+++ b/helix-core/src/test/java/org/apache/helix/TestConfigAccessor.java
@@ -19,14 +19,18 @@ package org.apache.helix;
  * under the License.
  */
 
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 
+import org.apache.helix.cloud.constants.CloudProvider;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.CloudConfig;
 import org.apache.helix.model.ConfigScope;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.builder.ConfigScopeBuilder;
+import org.apache.helix.tools.ClusterSetup;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -68,9 +72,8 @@ public class TestConfigAccessor extends ZkUnitTestBase {
     Assert.assertEquals(resourceConfigValue, "resourceConfigValue");
 
     // partition scope config
-    ConfigScope partitionScope =
-        new ConfigScopeBuilder().forCluster(clusterName).forResource("testResource")
-            .forPartition("testPartition").build();
+    ConfigScope partitionScope = new ConfigScopeBuilder().forCluster(clusterName)
+        .forResource("testResource").forPartition("testPartition").build();
     configAccessor.set(partitionScope, "partitionConfigKey", "partitionConfigValue");
     String partitionConfigValue = configAccessor.get(partitionScope, "partitionConfigKey");
     Assert.assertEquals(partitionConfigValue, "partitionConfigValue");
@@ -113,9 +116,8 @@ public class TestConfigAccessor extends ZkUnitTestBase {
         "should be [HELIX_ENABLED, HELIX_ENABLED_TIMESTAMP, HELIX_HOST, HELIX_PORT, participantConfigKey]");
     Assert.assertEquals(keys.get(4), "participantConfigKey");
 
-    keys =
-        configAccessor.getKeys(ConfigScopeProperty.PARTITION, clusterName, "testResource",
-            "testPartition");
+    keys = configAccessor.getKeys(ConfigScopeProperty.PARTITION, clusterName, "testResource",
+        "testPartition");
     Assert.assertEquals(keys.size(), 1, "should be [partitionConfigKey]");
     Assert.assertEquals(keys.get(0), "partitionConfigKey");
 
@@ -183,8 +185,8 @@ public class TestConfigAccessor extends ZkUnitTestBase {
 
     try {
       configAccessor.set(participantScope, "participantConfigKey", "participantConfigValue");
-      Assert
-          .fail("Except fail to set participant-config because participant: localhost_12918 is not added to cluster yet");
+      Assert.fail(
+          "Except fail to set participant-config because participant: localhost_12918 is not added to cluster yet");
     } catch (HelixException e) {
       // OK
     }
@@ -193,8 +195,8 @@ public class TestConfigAccessor extends ZkUnitTestBase {
     try {
       configAccessor.set(participantScope, "participantConfigKey", "participantConfigValue");
     } catch (Exception e) {
-      Assert
-          .fail("Except succeed to set participant-config because participant: localhost_12918 has been added to cluster");
+      Assert.fail(
+          "Except succeed to set participant-config because participant: localhost_12918 has been added to cluster");
     }
 
     String participantConfigValue = configAccessor.get(participantScope, "participantConfigKey");
@@ -204,4 +206,48 @@ public class TestConfigAccessor extends ZkUnitTestBase {
     configAccessor.close();
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
+
+  @Test
+  public void testUpdateCloudConfig() throws Exception {
+    ClusterSetup _clusterSetup = new ClusterSetup(ZK_ADDR);
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    CloudConfig.Builder cloudConfigInitBuilder = new CloudConfig.Builder();
+    cloudConfigInitBuilder.setCloudEnabled(true);
+    cloudConfigInitBuilder.setCloudID("TestCloudID");
+    List<String> sourceList = new ArrayList<String>();
+    sourceList.add("TestURL");
+    cloudConfigInitBuilder.setCloudInfoSources(sourceList);
+    cloudConfigInitBuilder.setCloudInfoProcessorName("TestProcessor");
+    cloudConfigInitBuilder.setCloudProvider(CloudProvider.CUSTOMIZED);
+    CloudConfig cloudConfigInit = cloudConfigInitBuilder.build();
+
+    _clusterSetup.addCluster(clusterName, false, cloudConfigInit);
+
+    // Read CloudConfig from Zookeeper and check the content
+    ConfigAccessor _configAccessor = new ConfigAccessor(_gZkClient);
+    CloudConfig cloudConfigFromZk = _configAccessor.getCloudConfig(clusterName);
+    Assert.assertTrue(cloudConfigFromZk.isCloudEnabled());
+    Assert.assertEquals(cloudConfigFromZk.getCloudID(), "TestCloudID");
+    List<String> listUrlFromZk = cloudConfigFromZk.getCloudInfoSources();
+    Assert.assertEquals(listUrlFromZk.get(0), "TestURL");
+    Assert.assertEquals(cloudConfigFromZk.getCloudInfoProcessorName(), "TestProcessor");
+    Assert.assertEquals(cloudConfigFromZk.getCloudProvider(), CloudProvider.CUSTOMIZED.name());
+
+    // Change the processor name and check if processor name has been changed in Zookeeper.
+    cloudConfigInitBuilder.setCloudInfoProcessorName("TestProcessor2");
+    cloudConfigInit = cloudConfigInitBuilder.build();
+    ZKHelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+    admin.addCloudConfig(clusterName, cloudConfigInit);
+
+    cloudConfigFromZk = _configAccessor.getCloudConfig(clusterName);
+    Assert.assertTrue(cloudConfigFromZk.isCloudEnabled());
+    Assert.assertEquals(cloudConfigFromZk.getCloudID(), "TestCloudID");
+    listUrlFromZk = cloudConfigFromZk.getCloudInfoSources();
+    Assert.assertEquals(listUrlFromZk.get(0), "TestURL");
+    Assert.assertEquals(cloudConfigFromZk.getCloudInfoProcessorName(), "TestProcessor2");
+    Assert.assertEquals(cloudConfigFromZk.getCloudProvider(), CloudProvider.CUSTOMIZED.name());
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
index 20acef4..69fa558 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
@@ -39,7 +40,9 @@ import org.apache.helix.PropertyType;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.cloud.constants.CloudProvider;
 import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.model.CloudConfig;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
@@ -506,4 +509,70 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
         .getListField(InstanceConfig.InstanceConfigProperty.HELIX_DISABLED_PARTITION.name()).size(),
         2);
   }
+
+  @Test
+  public void testAddCloudConfig() {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+    admin.addCluster(clusterName, true);
+
+    CloudConfig.Builder builder = new CloudConfig.Builder();
+    builder.setCloudEnabled(true);
+    builder.setCloudID("TestID");
+    builder.addCloudInfoSource("TestURL");
+    builder.setCloudProvider(CloudProvider.CUSTOMIZED);
+    builder.setCloudInfoProcessorName("TestProcessor");
+    CloudConfig cloudConfig = builder.build();
+
+    admin.addCloudConfig(clusterName, cloudConfig);
+
+    // Read CloudConfig from Zookeeper and check the content
+    ConfigAccessor _configAccessor = new ConfigAccessor(_gZkClient);
+    CloudConfig cloudConfigFromZk = _configAccessor.getCloudConfig(clusterName);
+    Assert.assertTrue(cloudConfigFromZk.isCloudEnabled());
+    Assert.assertEquals(cloudConfigFromZk.getCloudID(), "TestID");
+    Assert.assertEquals(cloudConfigFromZk.getCloudProvider(), CloudProvider.CUSTOMIZED.name());
+    List<String> listUrlFromZk = cloudConfigFromZk.getCloudInfoSources();
+    Assert.assertEquals(listUrlFromZk.get(0), "TestURL");
+    Assert.assertEquals(cloudConfigFromZk.getCloudInfoProcessorName(), "TestProcessor");
+  }
+
+
+  @Test
+  public void testRemoveCloudConfig() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+    admin.addCluster(clusterName, true);
+
+    CloudConfig.Builder builder = new CloudConfig.Builder();
+    builder.setCloudEnabled(true);
+    builder.setCloudID("TestID");
+    builder.addCloudInfoSource("TestURL");
+    builder.setCloudProvider(CloudProvider.CUSTOMIZED);
+    builder.setCloudInfoProcessorName("TestProcessor");
+    CloudConfig cloudConfig = builder.build();
+
+    admin.addCloudConfig(clusterName, cloudConfig);
+
+    // Read CloudConfig from Zookeeper and check the content
+    ConfigAccessor _configAccessor = new ConfigAccessor(_gZkClient);
+    CloudConfig cloudConfigFromZk = _configAccessor.getCloudConfig(clusterName);
+    Assert.assertTrue(cloudConfigFromZk.isCloudEnabled());
+    Assert.assertEquals(cloudConfigFromZk.getCloudID(), "TestID");
+    Assert.assertEquals(cloudConfigFromZk.getCloudProvider(), CloudProvider.CUSTOMIZED.name());
+    List<String> listUrlFromZk = cloudConfigFromZk.getCloudInfoSources();
+    Assert.assertEquals(listUrlFromZk.get(0), "TestURL");
+    Assert.assertEquals(cloudConfigFromZk.getCloudInfoProcessorName(), "TestProcessor");
+
+    // Remove Cloud Config and make sure it has been removed from Zookeeper
+    admin.removeCloudConfig(clusterName);
+    cloudConfigFromZk = _configAccessor.getCloudConfig(clusterName);
+    Assert.assertNull(cloudConfigFromZk);
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
index b299ab1..76e219b 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
@@ -31,6 +31,7 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.PropertyType;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.model.CloudConfig;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ConstraintItem;
@@ -313,6 +314,16 @@ public class MockHelixAdmin implements HelixAdmin {
 
   }
 
+  @Override
+  public void addCloudConfig(String clusterName, CloudConfig cloudConfig) {
+
+  }
+
+  @Override
+  public void removeCloudConfig(String clusterName) {
+
+  }
+
   @Override public List<String> getStateModelDefs(String clusterName) {
     return null;
   }
diff --git a/helix-core/src/test/java/org/apache/helix/model/cloud/TestCloudConfig.java b/helix-core/src/test/java/org/apache/helix/model/cloud/TestCloudConfig.java
index ee83011..01945de 100644
--- a/helix-core/src/test/java/org/apache/helix/model/cloud/TestCloudConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/model/cloud/TestCloudConfig.java
@@ -57,20 +57,22 @@ public class TestCloudConfig extends ZkUnitTestBase {
   }
 
   @Test(dependsOnMethods = "testCloudConfigNull")
-  public void testCloudConfig() {
+  public void testCloudConfig() throws Exception {
     String className = getShortClassName();
     String clusterName = "CLUSTER_" + className;
     TestHelper.setupEmptyCluster(_gZkClient, clusterName);
 
     // Create dummy CloudConfig object
-    CloudConfig cloudConfig = new CloudConfig(clusterName);
-    cloudConfig.setCloudEnabled(true);
-    cloudConfig.setCloudProvider(CloudProvider.AZURE);
-    cloudConfig.setCloudID("TestID");
+    CloudConfig.Builder cloudConfigBuilder = new CloudConfig.Builder();
+    cloudConfigBuilder.setCloudEnabled(true);
+    cloudConfigBuilder.setCloudProvider(CloudProvider.AZURE);
+    cloudConfigBuilder.setCloudID("TestID");
     List<String> infoURL = new ArrayList<String>();
     infoURL.add("TestURL");
-    cloudConfig.setCloudInfoSource(infoURL);
-    cloudConfig.setCloudInfoFProcessorName("TestProcessor");
+    cloudConfigBuilder.setCloudInfoSources(infoURL);
+    cloudConfigBuilder.setCloudInfoProcessorName("TestProcessor");
+
+    CloudConfig cloudConfig = cloudConfigBuilder.build();
 
     // Write the CloudConfig to Zookeeper
     ZKHelixDataAccessor accessor =
@@ -84,8 +86,8 @@ public class TestCloudConfig extends ZkUnitTestBase {
     Assert.assertTrue(cloudConfigFromZk.isCloudEnabled());
     Assert.assertEquals(cloudConfigFromZk.getCloudProvider(), CloudProvider.AZURE.name());
     Assert.assertEquals(cloudConfigFromZk.getCloudID(), "TestID");
+
     Assert.assertEquals(cloudConfigFromZk.getCloudInfoSources().size(), 1);
-    Assert.assertEquals(cloudConfigFromZk.getCloudInfoSources().get(0), "TestURL");
     Assert.assertEquals(cloudConfigFromZk.getCloudInfoProcessorName(), "TestProcessor");
   }
 
@@ -93,7 +95,7 @@ public class TestCloudConfig extends ZkUnitTestBase {
   public void testUnverifiedCloudConfigBuilder() {
     String className = getShortClassName();
     String clusterName = "CLUSTER_" + className;
-    CloudConfig.Builder builder = new CloudConfig.Builder(clusterName);
+    CloudConfig.Builder builder = new CloudConfig.Builder();
     builder.setCloudEnabled(true);
     // Verify will fail because cloudID has net been defined.
     CloudConfig cloudConfig = builder.build();
@@ -103,7 +105,7 @@ public class TestCloudConfig extends ZkUnitTestBase {
   public void testUnverifiedCloudConfigBuilderEmptySources() {
     String className = getShortClassName();
     String clusterName = "CLUSTER_" + className;
-    CloudConfig.Builder builder = new CloudConfig.Builder(clusterName);
+    CloudConfig.Builder builder = new CloudConfig.Builder();
     builder.setCloudEnabled(true);
     builder.setCloudProvider(CloudProvider.CUSTOMIZED);
     builder.setCloudID("TestID");
@@ -115,9 +117,7 @@ public class TestCloudConfig extends ZkUnitTestBase {
 
   @Test(expectedExceptions = HelixException.class)
   public void testUnverifiedCloudConfigBuilderWithoutProcessor() {
-    String className = getShortClassName();
-    String clusterName = "CLUSTER_" + className;
-    CloudConfig.Builder builder = new CloudConfig.Builder(clusterName);
+    CloudConfig.Builder builder = new CloudConfig.Builder();
     builder.setCloudEnabled(true);
     builder.setCloudProvider(CloudProvider.CUSTOMIZED);
     builder.setCloudID("TestID");
@@ -132,7 +132,7 @@ public class TestCloudConfig extends ZkUnitTestBase {
     String className = getShortClassName();
     String clusterName = "CLUSTER_" + className;
     TestHelper.setupEmptyCluster(_gZkClient, clusterName);
-    CloudConfig.Builder builder = new CloudConfig.Builder(clusterName);
+    CloudConfig.Builder builder = new CloudConfig.Builder();
     builder.setCloudEnabled(true);
     builder.setCloudProvider(CloudProvider.CUSTOMIZED);
     builder.setCloudID("TestID");
@@ -142,8 +142,7 @@ public class TestCloudConfig extends ZkUnitTestBase {
 
     // Check builder getter methods
     Assert.assertTrue(builder.getCloudEnabled());
-    Assert.assertEquals(builder.getCloudProvider(), CloudProvider.CUSTOMIZED);
-    Assert.assertEquals(builder.getClusterName(), clusterName);
+    Assert.assertEquals(builder.getCloudProvider(), CloudProvider.CUSTOMIZED.name());
     Assert.assertEquals(builder.getCloudID(), "TestID");
     List<String> listUrlFromBuilder = builder.getCloudInfoSources();
     Assert.assertEquals(listUrlFromBuilder.size(), 2);
@@ -175,15 +174,14 @@ public class TestCloudConfig extends ZkUnitTestBase {
     String className = getShortClassName();
     String clusterName = "CLUSTER_" + className;
     TestHelper.setupEmptyCluster(_gZkClient, clusterName);
-    CloudConfig.Builder builder = new CloudConfig.Builder(clusterName);
+    CloudConfig.Builder builder = new CloudConfig.Builder();
     builder.setCloudEnabled(true);
     builder.setCloudProvider(CloudProvider.AZURE);
     builder.setCloudID("TestID");
-    builder.setCloudInfoProcessorName("TestProcessor");
 
     // Check builder getter methods
     Assert.assertTrue(builder.getCloudEnabled());
-    Assert.assertEquals(builder.getCloudProvider(), CloudProvider.AZURE);
+    Assert.assertEquals(builder.getCloudProvider(), CloudProvider.AZURE.name());
 
     CloudConfig cloudConfig = builder.build();
 
@@ -198,7 +196,29 @@ public class TestCloudConfig extends ZkUnitTestBase {
     Assert.assertTrue(cloudConfigFromZk.isCloudEnabled());
     Assert.assertEquals(cloudConfigFromZk.getCloudProvider(), CloudProvider.AZURE.name());
 
-    // Since CloudProvider is not CUSTOMIZED, CloudInfoProcessor will be null.
+    // Since user does not set the CloudInfoProcessorName, this field will be null.
     Assert.assertNull(cloudConfigFromZk.getCloudInfoProcessorName());
+
+    // Checking the set method in CloudConfig
+    cloudConfig.setCloudEnabled(false);
+    accessor.setProperty(keyBuilder.cloudConfig(), cloudConfig);
+    cloudConfigFromZk = _configAccessor.getCloudConfig(clusterName);
+    Assert.assertFalse(cloudConfigFromZk.isCloudEnabled());
+
+    cloudConfig.setCloudEnabled(true);
+    cloudConfig.setCloudID("TestID2");
+    List<String> sourceList = new ArrayList<String>();
+    sourceList.add("TestURL0");
+    sourceList.add("TestURL1");
+    cloudConfig.setCloudInfoSource(sourceList);
+    accessor.setProperty(keyBuilder.cloudConfig(), cloudConfig);
+
+    cloudConfigFromZk = _configAccessor.getCloudConfig(clusterName);
+    Assert.assertTrue(cloudConfigFromZk.isCloudEnabled());
+    Assert.assertEquals(cloudConfigFromZk.getCloudProvider(), CloudProvider.AZURE.name());
+    Assert.assertEquals(cloudConfigFromZk.getCloudID(), "TestID2");
+    List<String> listUrlFromZk = cloudConfigFromZk.getCloudInfoSources();
+    Assert.assertEquals(listUrlFromZk.get(0), "TestURL0");
+    Assert.assertEquals(listUrlFromZk.get(1), "TestURL1");
   }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java b/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
index df3f341..68195bd 100644
--- a/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
+++ b/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
@@ -19,10 +19,13 @@ package org.apache.helix.tools;
  * under the License.
  */
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
+import java.util.List;
 
 import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.PropertyKey;
@@ -31,15 +34,15 @@ import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.cloud.constants.CloudProvider;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.CloudConfig;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.LiveInstance;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.AssertJUnit;
 import org.testng.annotations.AfterClass;
@@ -48,8 +51,6 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 public class TestClusterSetup extends ZkUnitTestBase {
-  private static Logger LOG = LoggerFactory.getLogger(TestClusterSetup.class);
-
   protected static final String CLUSTER_NAME = "TestClusterSetup";
   protected static final String TEST_DB = "TestDB";
   protected static final String INSTANCE_PREFIX = "instance_";
@@ -436,4 +437,83 @@ public class TestClusterSetup extends ZkUnitTestBase {
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
 
+
+  @Test(expectedExceptions = HelixException.class)
+  public void testAddClusterWithInvalidCloudConfig() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    CloudConfig.Builder cloudConfigInitBuilder = new CloudConfig.Builder();
+    cloudConfigInitBuilder.setCloudEnabled(true);
+    List<String> sourceList = new ArrayList<String>();
+    sourceList.add("TestURL");
+    cloudConfigInitBuilder.setCloudInfoSources(sourceList);
+    cloudConfigInitBuilder.setCloudProvider(CloudProvider.CUSTOMIZED);
+
+    CloudConfig cloudConfigInit = cloudConfigInitBuilder.build();
+
+
+    // Since setCloudInfoProcessorName is missing, this add cluster call will throw an exception
+    _clusterSetup.addCluster(clusterName, false, cloudConfigInit);
+  }
+
+  @Test(dependsOnMethods = "testAddClusterWithInvalidCloudConfig")
+  public void testAddClusterWithValidCloudConfig() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    CloudConfig.Builder cloudConfigInitBuilder = new CloudConfig.Builder();
+    cloudConfigInitBuilder.setCloudEnabled(true);
+    cloudConfigInitBuilder.setCloudID("TestID");
+    List<String> sourceList = new ArrayList<String>();
+    sourceList.add("TestURL");
+    cloudConfigInitBuilder.setCloudInfoSources(sourceList);
+    cloudConfigInitBuilder.setCloudInfoProcessorName("TestProcessorName");
+    cloudConfigInitBuilder.setCloudProvider(CloudProvider.CUSTOMIZED);
+
+    CloudConfig cloudConfigInit = cloudConfigInitBuilder.build();
+
+    _clusterSetup.addCluster(clusterName, false, cloudConfigInit);
+
+    // Read CloudConfig from Zookeeper and check the content
+    ConfigAccessor _configAccessor = new ConfigAccessor(_gZkClient);
+    CloudConfig cloudConfigFromZk = _configAccessor.getCloudConfig(clusterName);
+    Assert.assertTrue(cloudConfigFromZk.isCloudEnabled());
+    Assert.assertEquals(cloudConfigFromZk.getCloudID(), "TestID");
+    List<String> listUrlFromZk = cloudConfigFromZk.getCloudInfoSources();
+    Assert.assertEquals(listUrlFromZk.get(0), "TestURL");
+    Assert.assertEquals(cloudConfigFromZk.getCloudInfoProcessorName(), "TestProcessorName");
+    Assert.assertEquals(cloudConfigFromZk.getCloudProvider(), CloudProvider.CUSTOMIZED.name());
+  }
+
+
+  @Test(dependsOnMethods = "testAddClusterWithValidCloudConfig")
+  public void testAddClusterAzureProvider() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    CloudConfig.Builder cloudConfigInitBuilder = new CloudConfig.Builder();
+    cloudConfigInitBuilder.setCloudEnabled(true);
+    cloudConfigInitBuilder.setCloudID("TestID");
+    cloudConfigInitBuilder.setCloudProvider(CloudProvider.AZURE);
+
+    CloudConfig cloudConfigInit = cloudConfigInitBuilder.build();
+
+    _clusterSetup.addCluster(clusterName, false, cloudConfigInit);
+
+    // Read CloudConfig from Zookeeper and check the content
+    ConfigAccessor _configAccessor = new ConfigAccessor(_gZkClient);
+    CloudConfig cloudConfigFromZk = _configAccessor.getCloudConfig(clusterName);
+    Assert.assertTrue(cloudConfigFromZk.isCloudEnabled());
+    Assert.assertEquals(cloudConfigFromZk.getCloudID(), "TestID");
+    List<String> listUrlFromZk = cloudConfigFromZk.getCloudInfoSources();
+
+    // Since provider is not customized, CloudInfoSources and CloudInfoProcessorName will be null.
+    Assert.assertNull(listUrlFromZk);
+    Assert.assertNull(cloudConfigFromZk.getCloudInfoProcessorName());
+    Assert.assertEquals(cloudConfigFromZk.getCloudProvider(), CloudProvider.AZURE.name());
+  }
 }