You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/05/23 21:28:47 UTC

[1/2] helix git commit: Add support of setting/updating Cluster/Resource/Instance configs in ConfigAccessor.

Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x 8cf80a9f8 -> 8ba068e7b


Add support of setting/updating Cluster/Resource/Instance configs in ConfigAccessor.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7c92bf54
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7c92bf54
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7c92bf54

Branch: refs/heads/helix-0.6.x
Commit: 7c92bf543571daac555f5d8b933805dddced7ca5
Parents: 8cf80a9
Author: Lei Xia <lx...@linkedin.com>
Authored: Tue May 23 12:27:23 2017 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Tue May 23 12:27:23 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/ConfigAccessor.java   | 248 +++++++++++++++++++
 .../main/java/org/apache/helix/ZNRecord.java    |  19 ++
 .../java/org/apache/helix/ZNRecordDelta.java    |   5 +-
 .../org/apache/helix/manager/zk/ZKUtil.java     |  31 ++-
 .../org/apache/helix/model/ClusterConfig.java   |  15 ++
 .../integration/ZkIntegrationTestBase.java      |  12 +-
 .../org/apache/helix/manager/zk/TestZKUtil.java | 113 ++++++---
 7 files changed, 401 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/7c92bf54/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
index 34aef49..27a30cb 100644
--- a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
@@ -27,11 +27,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ConfigScope;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.manager.zk.ZKUtil;
 import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.util.StringTemplate;
 import org.apache.log4j.Logger;
 
@@ -509,4 +513,248 @@ public class ConfigAccessor {
     }
     return retKeys;
   }
+
+  private ZNRecord getConfigZnRecord(HelixConfigScope scope) {
+    String clusterName = scope.getClusterName();
+    if (!ZKUtil.isClusterSetup(clusterName, zkClient)) {
+      throw new HelixException("fail to get configs. cluster " + clusterName + " is not setup yet");
+    }
+
+    return zkClient.readData(scope.getZkPath(), true);
+  }
+
+  /**
+   * Get ClusterConfig of the given cluster.
+   *
+   * @param clusterName
+   *
+   * @return
+   */
+  public ClusterConfig getClusterConfig(String clusterName) {
+    if (!ZKUtil.isClusterSetup(clusterName, zkClient)) {
+      throw new HelixException("fail to get config. cluster: " + clusterName + " is NOT setup.");
+    }
+
+    HelixConfigScope scope =
+        new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(clusterName).build();
+    ZNRecord record = getConfigZnRecord(scope);
+
+    if (record == null) {
+      LOG.warn("No config found at " + scope.getZkPath());
+      return null;
+    }
+
+    return new ClusterConfig(record);
+  }
+
+  /**
+   * Set ClusterConfig of the given cluster.
+   * The current Cluster config will be replaced with the given clusterConfig.
+   * WARNING: This is not thread-safe or concurrent updates safe.
+   *
+   * @param clusterName
+   * @param clusterConfig
+   *
+   * @return
+   */
+  public void setClusterConfig(String clusterName, ClusterConfig clusterConfig) {
+    updateClusterConfig(clusterName, clusterConfig, true);
+  }
+
+  /**
+   * Update ClusterConfig of the given cluster.
+   * The value of field in current config will be replaced with the value of the same field in given config if it
+   * presents. If there is new field in given config but not in current config, the field will be added into
+   * the current config..
+   * The list fields and map fields will be replaced as a single entry.
+   *
+   * The current Cluster config will be replaced with the given clusterConfig.
+   * WARNING: This is not thread-safe or concurrent updates safe.
+   *
+   * @param clusterName
+   * @param clusterConfig
+   *
+   * @return
+   */
+  public void updateClusterConfig(String clusterName, ClusterConfig clusterConfig) {
+    updateClusterConfig(clusterName, clusterConfig, false);
+  }
+
+
+  private void updateClusterConfig(String clusterName, ClusterConfig clusterConfig, boolean overwrite) {
+    if (!ZKUtil.isClusterSetup(clusterName, zkClient)) {
+      throw new HelixException("fail to update config. cluster: " + clusterName + " is NOT setup.");
+    }
+
+    HelixConfigScope scope =
+        new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(clusterName).build();
+    String zkPath = scope.getZkPath();
+
+    if (overwrite) {
+      ZKUtil.createOrReplace(zkClient, zkPath, clusterConfig.getRecord(), true);
+    } else {
+      ZKUtil.createOrUpdate(zkClient, zkPath, clusterConfig.getRecord(), true, true);
+    }
+  }
+
+  /**
+   * Get resource config for given resource in given cluster.
+   *
+   * @param clusterName
+   * @param resourceName
+   *
+   * @return
+   */
+  public ResourceConfig getResourceConfig(String clusterName, String resourceName) {
+    HelixConfigScope scope =
+        new HelixConfigScopeBuilder(ConfigScopeProperty.RESOURCE).forCluster(clusterName)
+            .forResource(resourceName).build();
+    ZNRecord record = getConfigZnRecord(scope);
+
+    if (record == null) {
+      LOG.warn("No config found at " + scope.getZkPath());
+      return null;
+    }
+
+    return new ResourceConfig(record);
+  }
+
+  /**
+   * Set config of the given resource.
+   * The current Resource config will be replaced with the given clusterConfig.
+   *
+   * WARNING: This is not thread-safe or concurrent updates safe.
+   *
+   * @param clusterName
+   * @param resourceName
+   * @param resourceConfig
+   *
+   * @return
+   */
+  public void setResourceConfig(String clusterName, String resourceName,
+      ResourceConfig resourceConfig) {
+    updateResourceConfig(clusterName, resourceName, resourceConfig, true);
+  }
+
+  /**
+   * Update ResourceConfig of the given resource.
+   * The value of field in current config will be replaced with the value of the same field in given config if it
+   * presents. If there is new field in given config but not in current config, the field will be added into
+   * the current config..
+   * The list fields and map fields will be replaced as a single entry.
+   *
+   * The current Cluster config will be replaced with the given clusterConfig.
+   * WARNING: This is not thread-safe or concurrent updates safe.
+   *
+   * @param clusterName
+   * @param resourceName
+   * @param resourceConfig
+   *
+   * @return
+   */
+  public void updateResourceConfig(String clusterName, String resourceName,
+      ResourceConfig resourceConfig) {
+    updateResourceConfig(clusterName, resourceName, resourceConfig, false);
+  }
+
+  private void updateResourceConfig(String clusterName, String resourceName,
+      ResourceConfig resourceConfig, boolean overwrite) {
+    if (!ZKUtil.isClusterSetup(clusterName, zkClient)) {
+      throw new HelixException("fail to setup config. cluster: " + clusterName + " is NOT setup.");
+    }
+
+    HelixConfigScope scope =
+        new HelixConfigScopeBuilder(ConfigScopeProperty.RESOURCE).forCluster(clusterName)
+            .forResource(resourceName).build();
+    String zkPath = scope.getZkPath();
+
+    if (overwrite) {
+      ZKUtil.createOrReplace(zkClient, zkPath, resourceConfig.getRecord(), true);
+    } else {
+      ZKUtil.createOrUpdate(zkClient, zkPath, resourceConfig.getRecord(), true, true);
+    }
+  }
+
+  /**
+   * Get instance config for given resource in given cluster.
+   *
+   * @param clusterName
+   * @param instanceName
+   *
+   * @return
+   */
+  public InstanceConfig getInstanceConfig(String clusterName, String instanceName) {
+    if (!ZKUtil.isInstanceSetup(zkClient, clusterName, instanceName, InstanceType.PARTICIPANT)) {
+      throw new HelixException(
+          "fail to get config. instance: " + instanceName + " is NOT setup in cluster: "
+              + clusterName);
+    }
+
+    HelixConfigScope scope =
+        new HelixConfigScopeBuilder(ConfigScopeProperty.PARTICIPANT).forCluster(clusterName)
+            .forParticipant(instanceName).build();
+    ZNRecord record = getConfigZnRecord(scope);
+
+    if (record == null) {
+      LOG.warn("No config found at " + scope.getZkPath());
+      return null;
+    }
+
+    return new InstanceConfig(record);
+  }
+
+  /**
+   * Set config of the given instance config.
+   * The current instance config will be replaced with the given instanceConfig.
+   * WARNING: This is not thread-safe or concurrent updates safe.
+   *
+   * @param clusterName
+   * @param instanceName
+   * @param instanceConfig
+   *
+   * @return
+   */
+  public void setInstanceConfig(String clusterName, String instanceName,
+      InstanceConfig instanceConfig) {
+    updateInstanceConfig(clusterName, instanceName, instanceConfig, true);
+
+  }
+
+  /**
+   * Update ResourceConfig of the given resource. The value of field in current config will be
+   * replaced with the value of the same field in given config if it presents. If there is new field
+   * in given config but not in current config, the field will be added into the current config..
+   * The list fields and map fields will be replaced as a single entry.
+   * The current Cluster config will be replaced with the given clusterConfig. WARNING: This is not
+   * thread-safe or concurrent updates safe.
+   * *
+   *
+   * @param clusterName
+   * @param instanceName
+   * @param instanceConfig
+   *
+   * @return
+   */
+  public void updateInstanceConfig(String clusterName, String instanceName,
+      InstanceConfig instanceConfig) {
+    updateInstanceConfig(clusterName, instanceName, instanceConfig, false);
+  }
+
+  private void updateInstanceConfig(String clusterName, String instanceName,
+      InstanceConfig instanceConfig, boolean overwrite) {
+    if (!ZKUtil.isClusterSetup(clusterName, zkClient)) {
+      throw new HelixException("fail to setup config. cluster: " + clusterName + " is NOT setup.");
+    }
+
+    HelixConfigScope scope =
+        new HelixConfigScopeBuilder(ConfigScopeProperty.PARTICIPANT).forCluster(clusterName)
+            .forParticipant(instanceName).build();
+    String zkPath = scope.getZkPath();
+
+    if (overwrite) {
+      ZKUtil.createOrReplace(zkClient, zkPath, instanceConfig.getRecord(), true);
+    } else {
+      ZKUtil.createOrUpdate(zkClient, zkPath, instanceConfig.getRecord(), true, true);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/7c92bf54/helix-core/src/main/java/org/apache/helix/ZNRecord.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ZNRecord.java b/helix-core/src/main/java/org/apache/helix/ZNRecord.java
index 3ac9485..52bd5be 100644
--- a/helix-core/src/main/java/org/apache/helix/ZNRecord.java
+++ b/helix-core/src/main/java/org/apache/helix/ZNRecord.java
@@ -512,7 +512,24 @@ public class ZNRecord {
   }
 
   /**
+   * Replace functionality is used to update this ZNRecord with the given ZNRecord. The value of a
+   * field in this record will be replaced with the value of the same field in given record if it
+   * presents. If there is new field in given ZNRecord but not in this record, add that field into
+   * this record. The list fields and map fields will be replaced as a single entry.
+   *
+   * @param record
+   */
+  public void update(ZNRecord record) {
+    if (record != null) {
+      simpleFields.putAll(record.simpleFields);
+      listFields.putAll(record.listFields);
+      mapFields.putAll(record.mapFields);
+    }
+  }
+
+  /**
    * Merge in a {@link ZNRecordDelta} corresponding to its merge policy
+   *
    * @param delta
    */
   void merge(ZNRecordDelta delta) {
@@ -520,6 +537,8 @@ public class ZNRecord {
       merge(delta.getRecord());
     } else if (delta.getMergeOperation() == MergeOperation.SUBTRACT) {
       subtract(delta.getRecord());
+    } else if (delta.getMergeOperation() == MergeOperation.UPDATE) {
+      update(delta.getRecord());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/7c92bf54/helix-core/src/main/java/org/apache/helix/ZNRecordDelta.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ZNRecordDelta.java b/helix-core/src/main/java/org/apache/helix/ZNRecordDelta.java
index eff725d..616e1f5 100644
--- a/helix-core/src/main/java/org/apache/helix/ZNRecordDelta.java
+++ b/helix-core/src/main/java/org/apache/helix/ZNRecordDelta.java
@@ -28,7 +28,8 @@ public class ZNRecordDelta {
    */
   public enum MergeOperation {
     ADD,
-    SUBTRACT
+    SUBTRACT,
+    UPDATE
   };
 
   /**
@@ -44,7 +45,7 @@ public class ZNRecordDelta {
   /**
    * Initialize the delta with a record and the update mode
    * @param record
-   * @param _mergeOperation
+   * @param mergeOperation
    */
   public ZNRecordDelta(ZNRecord record, MergeOperation mergeOperation) {
     _record = new ZNRecord(record);

http://git-wip-us.apache.org/repos/asf/helix/blob/7c92bf54/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
index 9131e5b..2b4cfb2 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
@@ -196,7 +196,7 @@ public final class ZKUtil {
     }
   }
 
-  public static void createOrUpdate(ZkClient client, String path, final ZNRecord record,
+  public static void createOrMerge(ZkClient client, String path, final ZNRecord record,
       final boolean persistent, final boolean mergeOnUpdate) {
     int retryCount = 0;
     while (retryCount < RETRYLIMIT) {
@@ -232,6 +232,35 @@ public final class ZKUtil {
     }
   }
 
+  public static void createOrUpdate(ZkClient client, String path, final ZNRecord record,
+      final boolean persistent, final boolean mergeOnUpdate) {
+    int retryCount = 0;
+    while (retryCount < RETRYLIMIT) {
+      try {
+        if (client.exists(path)) {
+          DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
+            @Override public ZNRecord update(ZNRecord currentData) {
+              if (currentData != null && mergeOnUpdate) {
+                currentData.update(record);
+                return currentData;
+              }
+              return record;
+            }
+          };
+          client.updateDataSerialized(path, updater);
+        } else {
+          CreateMode mode = (persistent) ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;
+          client.create(path, record, mode);
+        }
+        break;
+      } catch (Exception e) {
+        retryCount = retryCount + 1;
+        logger.warn("Exception trying to update " + path + " Exception:" + e.getMessage()
+            + ". Will retry.");
+      }
+    }
+  }
+
   public static void asyncCreateOrUpdate(ZkClient client, String path, final ZNRecord record,
       final boolean persistent, final boolean mergeOnUpdate) {
     try {

http://git-wip-us.apache.org/repos/asf/helix/blob/7c92bf54/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 2be7ee1..3796f36 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -68,6 +68,21 @@ public class ClusterConfig extends HelixProperty {
   }
 
   /**
+   * Enable/Disable persist best possible assignment in a resource's idealstate.
+   *
+   * @return
+   */
+  public void setPersistBestPossibleAssignment(Boolean enable) {
+    if (enable == null) {
+      _record.getSimpleFields()
+          .remove(ClusterConfigProperty.PERSIST_BEST_POSSIBLE_ASSIGNMENT.toString());
+    } else {
+      _record.setBooleanField(ClusterConfigProperty.PERSIST_BEST_POSSIBLE_ASSIGNMENT.toString(),
+          enable);
+    }
+  }
+
+  /**
    *
    * @return
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/7c92bf54/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
index 0edd4d3..08a5730 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
@@ -104,15 +104,11 @@ public class ZkIntegrationTestBase {
   }
 
   protected void enablePersistBestPossibleAssignment(ZkClient zkClient, String clusterName,
-      Boolean enable) {
+      Boolean enabled) {
     ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
-    HelixConfigScope clusterScope =
-        new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
-            .forCluster(clusterName).build();
-
-    configAccessor.set(clusterScope,
-        ClusterConfig.ClusterConfigProperty.PERSIST_BEST_POSSIBLE_ASSIGNMENT.name(),
-        enable.toString());
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+    clusterConfig.setPersistBestPossibleAssignment(enabled);
+    configAccessor.setClusterConfig(clusterName, clusterConfig);
   }
 
   protected void disableDelayRebalanceInCluster(ZkClient zkClient, String clusterName,

http://git-wip-us.apache.org/repos/asf/helix/blob/7c92bf54/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKUtil.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKUtil.java
index ed604fa..19d30a6 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKUtil.java
@@ -19,16 +19,15 @@ package org.apache.helix.manager.zk;
  * under the License.
  */
 
-import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
-
+import java.util.Map;
 import org.apache.helix.PropertyPathBuilder;
-import org.apache.helix.PropertyType;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.ZkUnitTestBase;
-import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.log4j.Logger;
 import org.testng.AssertJUnit;
 import org.testng.annotations.AfterClass;
@@ -42,7 +41,7 @@ public class TestZKUtil extends ZkUnitTestBase {
   ZkClient _zkClient;
 
   @BeforeClass()
-  public void beforeClass() throws IOException, Exception {
+  public void beforeClass() throws Exception {
     _zkClient = new ZkClient(ZK_ADDR);
     _zkClient.setZkSerializer(new ZNRecordSerializer());
     if (_zkClient.exists("/" + clusterName)) {
@@ -79,9 +78,7 @@ public class TestZKUtil extends ZkUnitTestBase {
     List<ZNRecord> list = new ArrayList<ZNRecord>();
     list.add(new ZNRecord("id1"));
     list.add(new ZNRecord("id2"));
-    String path =
-        PropertyPathBuilder.getPath(PropertyType.CONFIGS, clusterName,
-            ConfigScopeProperty.PARTICIPANT.toString());
+    String path = PropertyPathBuilder.instanceConfig(clusterName);
     ZKUtil.createChildren(_zkClient, path, list);
     list = ZKUtil.getChildren(_zkClient, path);
     AssertJUnit.assertEquals(2, list.size());
@@ -96,63 +93,117 @@ public class TestZKUtil extends ZkUnitTestBase {
 
   @Test()
   public void testUpdateIfExists() {
-    String path =
-        PropertyPathBuilder.getPath(PropertyType.CONFIGS, clusterName,
-            ConfigScopeProperty.PARTICIPANT.toString(), "id3");
+    String path = PropertyPathBuilder.instanceConfig(clusterName, "id3");
     ZNRecord record = new ZNRecord("id4");
     ZKUtil.updateIfExists(_zkClient, path, record, false);
     AssertJUnit.assertFalse(_zkClient.exists(path));
     _zkClient.createPersistent(path);
     ZKUtil.updateIfExists(_zkClient, path, record, false);
     AssertJUnit.assertTrue(_zkClient.exists(path));
-    record = _zkClient.<ZNRecord> readData(path);
+    record = _zkClient.readData(path);
     AssertJUnit.assertEquals("id4", record.getId());
   }
 
   @Test()
   public void testSubtract() {
-    String path =
-        PropertyPathBuilder.getPath(PropertyType.CONFIGS, clusterName,
-            ConfigScopeProperty.PARTICIPANT.toString(), "id5");
+    String path = PropertyPathBuilder.instanceConfig(clusterName, "id5");
     ZNRecord record = new ZNRecord("id5");
     record.setSimpleField("key1", "value1");
     _zkClient.createPersistent(path, record);
     ZKUtil.subtract(_zkClient, path, record);
-    record = _zkClient.<ZNRecord> readData(path);
+    record = _zkClient.readData(path);
     AssertJUnit.assertNull(record.getSimpleField("key1"));
   }
 
   @Test()
   public void testNullChildren() {
-    String path =
-        PropertyPathBuilder.getPath(PropertyType.CONFIGS, clusterName,
-            ConfigScopeProperty.PARTICIPANT.toString(), "id6");
+    String path = PropertyPathBuilder.instanceConfig(clusterName, "id6");
     ZKUtil.createChildren(_zkClient, path, (List<ZNRecord>) null);
   }
 
   @Test()
-  public void testCreateOrUpdate() {
-    String path =
-        PropertyPathBuilder.getPath(PropertyType.CONFIGS, clusterName,
-            ConfigScopeProperty.PARTICIPANT.toString(), "id7");
+  public void testCreateOrMerge() {
+    String path = PropertyPathBuilder.instanceConfig(clusterName, "id7");
     ZNRecord record = new ZNRecord("id7");
-    ZKUtil.createOrUpdate(_zkClient, path, record, true, true);
-    record = _zkClient.<ZNRecord> readData(path);
-    AssertJUnit.assertEquals("id7", record.getId());
+    List<String> list = Arrays.asList("value1");
+    record.setListField("list", list);
+    ZKUtil.createOrMerge(_zkClient, path, record, true, true);
+    record = _zkClient.readData(path);
+    AssertJUnit.assertEquals(list, record.getListField("list"));
+
+    record = new ZNRecord("id7");
+    List<String> list2 = Arrays.asList("value2");
+    record.setListField("list", list2);
+    ZKUtil.createOrMerge(_zkClient, path, record, true, true);
+    record = _zkClient.readData(path);
+    AssertJUnit.assertEquals(Arrays.asList("value1", "value2"), record.getListField("list"));
+
+    Map<String, String> map = new HashMap<String, String>() {{put("k1", "v1");}};
+    record.setMapField("map", map);
+    ZKUtil.createOrMerge(_zkClient, path, record, true, true);
+    record = _zkClient.readData(path);
+    AssertJUnit.assertEquals(map, record.getMapField("map"));
+
+    record = new ZNRecord("id7");
+    Map<String, String> map2 = new HashMap<String, String>() {{put("k2", "v2");}};
+    record.setMapField("map", map2);
+    ZKUtil.createOrMerge(_zkClient, path, record, true, true);
+    record = _zkClient.readData(path);
+    AssertJUnit.assertEquals(new HashMap<String, String>() {{
+      put("k1", "v1");
+      put("k2", "v2");
+    }}, record.getMapField("map"));
   }
 
   @Test()
   public void testCreateOrReplace() {
-    String path =
-        PropertyPathBuilder.getPath(PropertyType.CONFIGS, clusterName,
-            ConfigScopeProperty.PARTICIPANT.toString(), "id8");
+    String path = PropertyPathBuilder.instanceConfig(clusterName, "id8");
     ZNRecord record = new ZNRecord("id8");
     ZKUtil.createOrReplace(_zkClient, path, record, true);
-    record = _zkClient.<ZNRecord> readData(path);
+    record = _zkClient.readData(path);
     AssertJUnit.assertEquals("id8", record.getId());
     record = new ZNRecord("id9");
     ZKUtil.createOrReplace(_zkClient, path, record, true);
-    record = _zkClient.<ZNRecord> readData(path);
+    record = _zkClient.readData(path);
     AssertJUnit.assertEquals("id9", record.getId());
   }
+
+  @Test()
+  public void testCreateOrUpdate() {
+    String path = PropertyPathBuilder.instanceConfig(clusterName, "id7");
+    ZNRecord record = new ZNRecord("id7");
+    ZKUtil.createOrMerge(_zkClient, path, record, true, true);
+    record = _zkClient.readData(path);
+    AssertJUnit.assertEquals("id7", record.getId());
+
+    record = new ZNRecord("id7");
+    List<String> list = Arrays.asList("value1", "value2");
+    record.setListField("list", list);
+    ZKUtil.createOrUpdate(_zkClient, path, record, true, true);
+    record = _zkClient.readData(path);
+    AssertJUnit.assertEquals(list, record.getListField("list"));
+
+    record = new ZNRecord("id7");
+    List<String> list2 = Arrays.asList("value3", "value4");
+    record.setListField("list", list2);
+    ZKUtil.createOrUpdate(_zkClient, path, record, true, true);
+    record = _zkClient.readData(path);
+    AssertJUnit.assertEquals(list2, record.getListField("list"));
+
+
+    Map<String, String> map = new HashMap<String, String>() {{put("k1", "v1");}};
+    record.setMapField("map", map);
+    ZKUtil.createOrUpdate(_zkClient, path, record, true, true);
+    record = _zkClient.readData(path);
+    AssertJUnit.assertEquals(map, record.getMapField("map"));
+
+    record = new ZNRecord("id7");
+    Map<String, String> map2 = new HashMap<String, String>() {{put("k2", "v2");}};
+    record.setMapField("map", map2);
+    ZKUtil.createOrUpdate(_zkClient, path, record, true, true);
+    record = _zkClient.readData(path);
+    AssertJUnit.assertEquals(new HashMap<String, String>() {{
+      put("k2", "v2");
+    }}, record.getMapField("map"));
+  }
 }


[2/2] helix git commit: Allow user to enable persisting preference list and best possible state map into IdealState in full-auto mode.

Posted by jx...@apache.org.
Allow user to enable persisting preference list and best possible state map into IdealState in full-auto mode.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/8ba068e7
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/8ba068e7
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/8ba068e7

Branch: refs/heads/helix-0.6.x
Commit: 8ba068e7b78aedf4743f2da57670384534d1d4f8
Parents: 7c92bf5
Author: Lei Xia <lx...@linkedin.com>
Authored: Tue May 23 13:58:24 2017 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Tue May 23 13:58:24 2017 -0700

----------------------------------------------------------------------
 .../stages/BestPossibleStateCalcStage.java      |   3 +
 .../stages/BestPossibleStateOutput.java         |  42 +++++
 .../stages/PersistAssignmentStage.java          | 172 ++++++++++---------
 .../java/org/apache/helix/model/IdealState.java |  65 +++++--
 .../TestRebalancerPersistAssignments.java       | 126 +++++---------
 5 files changed, 227 insertions(+), 181 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/8ba068e7/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index cba0659..526f532 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -127,6 +127,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
           rebalancer.init(manager);
           idealState =
               rebalancer.computeNewIdealState(resourceName, idealState, currentStateOutput, cache);
+          output.setPreferenceLists(resourceName, idealState.getPreferenceLists());
 
           // Use the internal MappingCalculator interface to compute the final assignment
           // The next release will support rebalancers that compute the mapping from start to finish
@@ -180,6 +181,8 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
       rebalancer = customizedRebalancer;
       break;
     default:
+      logger.error(
+          "Fail to find the rebalancer, invalid rebalance mode " + idealState.getRebalanceMode());
       break;
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/8ba068e7/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
index 168a3b0..a3ad56d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
@@ -21,6 +21,7 @@ package org.apache.helix.controller.stages;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -29,6 +30,8 @@ import org.apache.helix.model.Partition;
 public class BestPossibleStateOutput {
   // Map of resource->partition->instance->state
   Map<String, Map<Partition, Map<String, String>>> _stateMap;
+  /* resource -> partition -> preference list */
+  private Map<String, Map<String, List<String>>> _preferenceLists;
 
   public BestPossibleStateOutput() {
     _stateMap = new HashMap<String, Map<Partition, Map<String, String>>>();
@@ -77,6 +80,45 @@ public class BestPossibleStateOutput {
     return _stateMap;
   }
 
+  public Map<String, Map<String, List<String>>> getPreferenceLists() {
+    return _preferenceLists;
+  }
+
+  public Map<String, List<String>> getPreferenceLists(String resource) {
+    if (_preferenceLists != null && _preferenceLists.containsKey(resource)) {
+      return _preferenceLists.get(resource);
+    }
+
+    return null;
+  }
+
+  public List<String> getPreferenceList(String resource, String partition) {
+    if (_preferenceLists != null && _preferenceLists.containsKey(resource) && _preferenceLists
+        .get(resource).containsKey(partition)) {
+      return _preferenceLists.get(resource).get(partition);
+    }
+
+    return null;
+  }
+
+  public void setPreferenceList(String resource, String partition, List<String> list) {
+    if (_preferenceLists == null) {
+      _preferenceLists = new HashMap<String, Map<String, List<String>>>();
+    }
+    if (!_preferenceLists.containsKey(resource)) {
+      _preferenceLists.put(resource, new HashMap<String, List<String>>());
+    }
+    _preferenceLists.get(resource).put(partition, list);
+  }
+
+  public void setPreferenceLists(String resource,
+      Map<String, List<String>> resourcePreferenceLists) {
+    if (_preferenceLists == null) {
+      _preferenceLists = new HashMap<String, Map<String, List<String>>>();
+    }
+    _preferenceLists.put(resource, resourcePreferenceLists);
+  }
+
   @Override
   public String toString() {
     return _stateMap.toString();

http://git-wip-us.apache.org/repos/asf/helix/blob/8ba068e7/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
index 9c297f8..425b38b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
@@ -19,19 +19,16 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
-import java.util.Collections;
-import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-
+import java.util.Set;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
-import org.apache.helix.model.MasterSlaveSMD;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.log4j.Logger;
@@ -49,56 +46,58 @@ public class PersistAssignmentStage extends AbstractBaseStage {
     ClusterDataCache cache = event.getAttribute("ClusterDataCache");
     ClusterConfig clusterConfig = cache.getClusterConfig();
 
-    if (clusterConfig.isPersistBestPossibleAssignment()) {
-      HelixManager helixManager = event.getAttribute("helixmanager");
-      HelixDataAccessor accessor = helixManager.getHelixDataAccessor();
-      PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-      BestPossibleStateOutput bestPossibleAssignments =
-          event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
-      Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
-
-      for (String resourceId : bestPossibleAssignments.resourceSet()) {
-        Resource resource = resourceMap.get(resourceId);
-        if (resource != null) {
-          boolean changed = false;
-          Map<Partition, Map<String, String>> bestPossibleAssignment =
-              bestPossibleAssignments.getResourceMap(resourceId);
-          IdealState idealState = cache.getIdealState(resourceId);
-          if (idealState == null) {
-            LOG.warn("IdealState not found for resource " + resourceId);
-            continue;
-          }
-          IdealState.RebalanceMode mode = idealState.getRebalanceMode();
-          if (!mode.equals(IdealState.RebalanceMode.SEMI_AUTO) && !mode
-              .equals(IdealState.RebalanceMode.FULL_AUTO)) {
-            // do not persist assignment for resource in neither semi or full auto.
-            continue;
-          }
+    if (!clusterConfig.isPersistBestPossibleAssignment()) {
+      return;
+    }
 
-          //TODO: temporary solution for Espresso/Dbus backcompatible, should remove this.
-          Map<Partition, Map<String, String>> assignmentToPersist =
-              convertAssignmentPersisted(resource, idealState, bestPossibleAssignment);
-
-          for (Partition partition : resource.getPartitions()) {
-            Map<String, String> instanceMap = assignmentToPersist.get(partition);
-            Map<String, String> existInstanceMap =
-                idealState.getInstanceStateMap(partition.getPartitionName());
-            if (instanceMap == null && existInstanceMap == null) {
-              continue;
-            }
-            if (instanceMap == null || existInstanceMap == null || !instanceMap
-                .equals(existInstanceMap)) {
-              changed = true;
-              break;
-            }
+    BestPossibleStateOutput bestPossibleAssignment =
+        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
+
+    HelixManager helixManager = event.getAttribute("helixmanager");
+    HelixDataAccessor accessor = helixManager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
+
+    for (String resourceId : bestPossibleAssignment.resourceSet()) {
+      Resource resource = resourceMap.get(resourceId);
+      if (resource != null) {
+        final IdealState idealState = cache.getIdealState(resourceId);
+        if (idealState == null) {
+          LOG.warn("IdealState not found for resource " + resourceId);
+          continue;
+        }
+        IdealState.RebalanceMode mode = idealState.getRebalanceMode();
+        if (!mode.equals(IdealState.RebalanceMode.SEMI_AUTO) && !mode
+            .equals(IdealState.RebalanceMode.FULL_AUTO)) {
+          // do not persist assignment for resource in neither semi or full auto.
+          continue;
+        }
+
+        boolean needPersist = false;
+        if (mode.equals(IdealState.RebalanceMode.FULL_AUTO)) {
+          // persist preference list in ful-auto mode.
+          Map<String, List<String>> newLists =
+              bestPossibleAssignment.getPreferenceLists(resourceId);
+          if (newLists != null && hasPreferenceListChanged(newLists, idealState)) {
+            idealState.setPreferenceLists(newLists);
+            needPersist = true;
           }
-          if (changed) {
-            for (Partition partition : assignmentToPersist.keySet()) {
-              Map<String, String> instanceMap = assignmentToPersist.get(partition);
-              idealState.setInstanceStateMap(partition.getPartitionName(), instanceMap);
-            }
-            accessor.setProperty(keyBuilder.idealStates(resourceId), idealState);
+        }
+
+        Map<Partition, Map<String, String>> bestPossibleAssignements =
+            bestPossibleAssignment.getResourceMap(resourceId);
+
+        if (bestPossibleAssignements != null && hasInstanceMapChanged(bestPossibleAssignements,
+            idealState)) {
+          for (Partition partition : bestPossibleAssignements.keySet()) {
+            Map<String, String> instanceMap = bestPossibleAssignements.get(partition);
+            idealState.setInstanceStateMap(partition.getPartitionName(), instanceMap);
           }
+          needPersist = true;
+        }
+
+        if (needPersist) {
+          accessor.setProperty(keyBuilder.idealStates(resourceId), idealState);
         }
       }
     }
@@ -108,47 +107,50 @@ public class PersistAssignmentStage extends AbstractBaseStage {
   }
 
   /**
-   * TODO: This is a temporary hacky for back-compatible support of Espresso and Databus,
-   * we should get rid of this conversion as soon as possible.
-   * --- Lei, 2016/9/9.
+   * has the preference list changed from the one persisted in current IdealState
    */
-  private Map<Partition, Map<String, String>> convertAssignmentPersisted(Resource resource,
-      IdealState idealState, Map<Partition, Map<String, String>> bestPossibleAssignment) {
-    String stateModelDef = idealState.getStateModelDefRef();
-    /** Only convert for MasterSlave resources */
-    if (!stateModelDef.equals(BuiltInStateModelDefinitions.MasterSlave.name())) {
-      return bestPossibleAssignment;
+  private boolean hasPreferenceListChanged(Map<String, List<String>> newLists,
+      IdealState idealState) {
+    Map<String, List<String>> existLists = idealState.getPreferenceLists();
+
+    Set<String> partitions = new HashSet<String>(newLists.keySet());
+    partitions.addAll(existLists.keySet());
+
+    for (String partition : partitions) {
+      List<String> assignedInstances = newLists.get(partition);
+      List<String> existingInstances = existLists.get(partition);
+      if (assignedInstances == null && existingInstances == null) {
+        continue;
+      }
+      if (assignedInstances == null || existingInstances == null || !assignedInstances
+          .equals(existingInstances)) {
+        return true;
+      }
     }
 
-    Map<Partition, Map<String, String>> assignmentToPersist =
-        new HashMap<Partition, Map<String, String>>();
-
-    for (Partition partition : resource.getPartitions()) {
-      Map<String, String> instanceMap = new HashMap<String, String>();
-      instanceMap.putAll(bestPossibleAssignment.get(partition));
+    return false;
+  }
 
-      List<String> preferenceList = idealState.getPreferenceList(partition.getPartitionName());
-      boolean hasMaster = false;
-      for (String ins : preferenceList) {
-        String state = instanceMap.get(ins);
-        if (state == null || (!state.equals(MasterSlaveSMD.States.SLAVE.name()) && !state
-            .equals(MasterSlaveSMD.States.MASTER.name()))) {
-          instanceMap.put(ins, MasterSlaveSMD.States.SLAVE.name());
-        }
+  private boolean hasInstanceMapChanged(Map<Partition, Map<String, String>> newAssiments,
+      IdealState idealState) {
+    Set<Partition> partitions = new HashSet<Partition>(newAssiments.keySet());
+    for (String p : idealState.getPartitionSet()) {
+      partitions.add(new Partition(p));
+    }
 
-        if (state != null && state.equals(MasterSlaveSMD.States.MASTER.name())) {
-          hasMaster = true;
-        }
+    for (Partition partition : partitions) {
+      Map<String, String> instanceMap = newAssiments.get(partition);
+      Map<String, String> existInstanceMap =
+          idealState.getInstanceStateMap(partition.getPartitionName());
+      if (instanceMap == null && existInstanceMap == null) {
+        continue;
       }
-
-      // if no master, just pick the first node in the preference list as the master.
-      if (!hasMaster && preferenceList.size() > 0) {
-        instanceMap.put(preferenceList.get(0), MasterSlaveSMD.States.MASTER.name());
+      if (instanceMap == null || existInstanceMap == null || !instanceMap
+          .equals(existInstanceMap)) {
+        return true;
       }
-
-      assignmentToPersist.put(partition, instanceMap);
     }
 
-    return assignmentToPersist;
+    return false;
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/8ba068e7/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 907bd27..48e43d6 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -352,6 +352,9 @@ public class IdealState extends HelixProperty {
 
   /**
    * Get the current mapping of a partition
+   * CAUTION: In FULL-AUTO mode, this method could return empty map if
+   * {@link ClusterConfig#setPersistBestPossibleAssignment(Boolean)} is set to true.
+   *
    * @param partitionName the name of the partition
    * @return the instances where the replicas live and the state of each
    */
@@ -371,37 +374,75 @@ public class IdealState extends HelixProperty {
   }
 
   /**
-   * Get the instances who host replicas of a partition
+   * Get the instances who host replicas of a partition.
+   * CAUTION: In FULL-AUTO mode, this method could return empty map if
+   * {@link ClusterConfig#setPersistBestPossibleAssignment(Boolean)} is set to true.
+   +
    * @param partitionName the partition to look up
    * @return set of instance names
    */
   public Set<String> getInstanceSet(String partitionName) {
-    if (getRebalanceMode() == RebalanceMode.SEMI_AUTO
-        || getRebalanceMode() == RebalanceMode.FULL_AUTO
-        || getRebalanceMode() == RebalanceMode.USER_DEFINED
-        || getRebalanceMode() == RebalanceMode.TASK) {
+    switch (getRebalanceMode()) {
+    case FULL_AUTO:
+    case SEMI_AUTO:
+    case USER_DEFINED:
+    case TASK:
       List<String> prefList = _record.getListField(partitionName);
-      if (prefList != null) {
+      if (prefList != null && !prefList.isEmpty()) {
         return new TreeSet<String>(prefList);
       } else {
-        logger.warn(partitionName + " does NOT exist");
-        return Collections.emptySet();
+        Map<String, String> stateMap = _record.getMapField(partitionName);
+        if (stateMap != null && !stateMap.isEmpty()) {
+          return new TreeSet<String>(stateMap.keySet());
+        } else {
+          logger.warn(partitionName + " does NOT exist");
+        }
       }
-    } else if (getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
+      break;
+    case CUSTOMIZED:
       Map<String, String> stateMap = _record.getMapField(partitionName);
       if (stateMap != null) {
         return new TreeSet<String>(stateMap.keySet());
       } else {
         logger.warn(partitionName + " does NOT exist");
-        return Collections.emptySet();
       }
-    } else {
+      break;
+    case NONE:
+    default:
       logger.error("Invalid ideal state mode: " + getResourceName());
-      return Collections.emptySet();
+      break;
     }
 
+    return Collections.emptySet();
+  }
+
+  /** Set the preference list of a partition
+   * @param partitionName the name of the partition
+   * @param instanceList the instance preference list
+   */
+  public void setPreferenceList(String partitionName, List<String> instanceList) {
+    _record.setListField(partitionName, instanceList);
+  }
+
+  /**
+   * Set the preference lists for all partitions in this resource.
+   *
+   * @param instanceLists the map of instance preference lists.
+   */
+  public void setPreferenceLists(Map<String, List<String>> instanceLists) {
+    _record.setListFields(instanceLists);
+  }
+
+  /**
+   * Get the preference lists for all partitions
+   *
+   * @return map of lists of instances for all partitions in this resource.
+   */
+  public Map<String, List<String>> getPreferenceLists() {
+    return _record.getListFields();
   }
 
+
   /**
    * Get the preference list of a partition
    * @param partitionName the name of the partition

http://git-wip-us.apache.org/repos/asf/helix/blob/8ba068e7/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java b/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java
index 3aec847..2a9dc69 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java
@@ -24,11 +24,8 @@ import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.MasterSlaveSMD;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
-import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier;
-import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
@@ -42,6 +39,8 @@ import java.util.Map;
 import java.util.Set;
 
 public class TestRebalancerPersistAssignments extends ZkStandAloneCMTestBase {
+  Set<String> _instanceNames = new HashSet<String>();
+
   @Override
   @BeforeClass
   public void beforeClass() throws Exception {
@@ -69,13 +68,14 @@ public class TestRebalancerPersistAssignments extends ZkStandAloneCMTestBase {
     // start dummy participants
     for (int i = 0; i < NODE_NR; i++) {
       String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _instanceNames.add(instanceName);
       _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
       _participants[i].syncStart();
     }
   }
 
   @DataProvider(name = "rebalanceModes")
-  public static RebalanceMode [][] rebalanceModes() {
+  public static Object [][] rebalanceModes() {
     return new RebalanceMode[][] { {RebalanceMode.SEMI_AUTO}, {RebalanceMode.FULL_AUTO}};
   }
 
@@ -88,23 +88,25 @@ public class TestRebalancerPersistAssignments extends ZkStandAloneCMTestBase {
         BuiltInStateModelDefinitions.LeaderStandby.name(), rebalanceMode.name());
     _setupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3);
 
-    HelixClusterVerifier verifier =
+    BestPossibleExternalViewVerifier.Builder verifierBuilder =
         new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
-            .setResources(new HashSet<String>(Collections.singleton(testDb))).build();
-    Thread.sleep(500);
-    Assert.assertTrue(verifier.verify());
+            .setResources(new HashSet<String>(Collections.singleton(testDb)));
+
+    Assert.assertTrue(verifierBuilder.build().verify());
 
     // kill 1 node
     _participants[0].syncStop();
 
-    Assert.assertTrue(verifier.verify());
+    Set<String> liveInstances = new HashSet<String>(_instanceNames);
+    liveInstances.remove(_participants[0].getInstanceName());
+    verifierBuilder.setExpectLiveInstances(liveInstances);
+    Assert.assertTrue(verifierBuilder.build().verify());
 
     IdealState idealState =
         _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb);
 
     Set<String> excludedInstances = new HashSet<String>();
     excludedInstances.add(_participants[0].getInstanceName());
-    Thread.sleep(2000);
     verifyAssignmentInIdealStateWithPersistDisabled(idealState, excludedInstances);
 
     // clean up
@@ -124,10 +126,11 @@ public class TestRebalancerPersistAssignments extends ZkStandAloneCMTestBase {
         BuiltInStateModelDefinitions.LeaderStandby.name(), rebalanceMode.name());
     _setupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3);
 
-    HelixClusterVerifier verifier =
+    BestPossibleExternalViewVerifier.Builder verifierBuilder =
         new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
-            .setResources(new HashSet<String>(Collections.singleton(testDb))).build();
-    Assert.assertTrue(verifier.verify());
+            .setResources(new HashSet<String>(Collections.singleton(testDb)));
+
+    Assert.assertTrue(verifierBuilder.build().verify());
 
     IdealState idealState =
         _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb);
@@ -136,9 +139,10 @@ public class TestRebalancerPersistAssignments extends ZkStandAloneCMTestBase {
     // kill 1 node
     _participants[0].syncStop();
 
-    Boolean result = ClusterStateVerifier.verifyByZkCallback(
-        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
-    Assert.assertTrue(result);
+    Set<String> liveInstances = new HashSet<String>(_instanceNames);
+    liveInstances.remove(_participants[0].getInstanceName());
+    verifierBuilder.setExpectLiveInstances(liveInstances);
+    Assert.assertTrue(verifierBuilder.build().verify());
 
     idealState = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb);
     // verify that IdealState contains updated assignment in it map fields.
@@ -154,72 +158,8 @@ public class TestRebalancerPersistAssignments extends ZkStandAloneCMTestBase {
     _participants[0].syncStart();
   }
 
-  /**
-   * This test is to test the temporary solution for solving Espresso/Databus back-compatible map format issue.
-   *
-   * @throws Exception
-   */
-  @Test(dependsOnMethods = { "testDisablePersist" })
-  public void testSemiAutoEnablePersistMasterSlave() throws Exception {
-    String testDb = "TestDB1-MasterSlave";
-    enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
-
-    _setupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5, BuiltInStateModelDefinitions.MasterSlave.name(),
-        RebalanceMode.SEMI_AUTO.name());
-    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3);
-
-    HelixClusterVerifier verifier =
-        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
-            .setResources(new HashSet<String>(Collections.singleton(testDb))).build();
-    Assert.assertTrue(verifier.verify());
-
-    IdealState idealState =
-        _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb);
-    verifySemiAutoMasterSlaveAssignment(idealState);
-
-    // kill 1 node
-    _participants[0].syncStop();
-
-    Assert.assertTrue(verifier.verify());
-
-    idealState = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb);
-    verifySemiAutoMasterSlaveAssignment(idealState);
-
-    // disable an instance
-    _setupTool.getClusterManagementTool()
-        .enableInstance(CLUSTER_NAME, _participants[1].getInstanceName(), false);
-    idealState = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb);
-    verifySemiAutoMasterSlaveAssignment(idealState);
-
-    // clean up
-    _setupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb);
-    _setupTool.getClusterManagementTool()
-        .enableInstance(CLUSTER_NAME, _participants[1].getInstanceName(), true);
-    _participants[0].reset();
-    _participants[0].syncStart();
-  }
-
-  private void verifySemiAutoMasterSlaveAssignment(IdealState idealState) {
-    for (String partition : idealState.getPartitionSet()) {
-      Map<String, String> instanceStateMap = idealState.getInstanceStateMap(partition);
-      List<String> preferenceList = idealState.getPreferenceList(partition);
-      int numMaster = 0;
-
-      for (String ins : preferenceList) {
-        Assert.assertTrue(instanceStateMap.containsKey(ins));
-        String state = instanceStateMap.get(ins);
-        Assert.assertTrue(state.equals(MasterSlaveSMD.States.MASTER.name()) || state
-            .equals(MasterSlaveSMD.States.SLAVE.name()));
-        if (state.equals(MasterSlaveSMD.States.MASTER.name())) {
-          numMaster++;
-        }
-      }
-
-      Assert.assertEquals(numMaster, 1);
-    }
-  }
-
-  // verify that the disabled or failed instance should not be included in bestPossible assignment.
+  // verify that both list field and map field should be persisted in IS,
+  // And the disabled or failed instance should not be included in bestPossible assignment.
   private void verifyAssignmentInIdealStateWithPersistEnabled(IdealState idealState,
       Set<String> excludedInstances) {
     for (String partition : idealState.getPartitionSet()) {
@@ -228,8 +168,20 @@ public class TestRebalancerPersistAssignments extends ZkStandAloneCMTestBase {
       Assert.assertFalse(instanceStateMap.isEmpty());
 
       Set<String> instancesInMap = instanceStateMap.keySet();
-      Set<String> instanceInList = idealState.getInstanceSet(partition);
-      Assert.assertTrue(instanceInList.containsAll(instancesInMap));
+      if (idealState.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
+        Set<String> instanceInList = idealState.getInstanceSet(partition);
+        Assert.assertTrue(instanceInList.containsAll(instancesInMap));
+      }
+
+      if(idealState.getRebalanceMode() == RebalanceMode.FULL_AUTO) {
+        // preference list should be persisted in IS.
+        List<String> instanceList = idealState.getPreferenceList(partition);
+        Assert.assertNotNull(instanceList);
+        Assert.assertFalse(instanceList.isEmpty());
+        for (String ins : excludedInstances) {
+          Assert.assertFalse(instanceList.contains(ins));
+        }
+      }
 
       for (String ins : excludedInstances) {
         Assert.assertFalse(instancesInMap.contains(ins));
@@ -254,6 +206,12 @@ public class TestRebalancerPersistAssignments extends ZkStandAloneCMTestBase {
           // if at least one excluded instance is included, it means assignment was not updated.
           assignmentNotChanged = true;
         }
+        if(idealState.getRebalanceMode() == RebalanceMode.FULL_AUTO) {
+          List<String> instanceList = idealState.getPreferenceList(partition);
+          if (instanceList.contains(ins)) {
+            assignmentNotChanged = true;
+          }
+        }
       }
     }