You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/05/23 21:28:47 UTC
[1/2] helix git commit: Add support of setting/updating
Cluster/Resource/Instance configs in ConfigAccessor.
Repository: helix
Updated Branches:
refs/heads/helix-0.6.x 8cf80a9f8 -> 8ba068e7b
Add support of setting/updating Cluster/Resource/Instance configs in ConfigAccessor.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7c92bf54
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7c92bf54
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7c92bf54
Branch: refs/heads/helix-0.6.x
Commit: 7c92bf543571daac555f5d8b933805dddced7ca5
Parents: 8cf80a9
Author: Lei Xia <lx...@linkedin.com>
Authored: Tue May 23 12:27:23 2017 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Tue May 23 12:27:23 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/helix/ConfigAccessor.java | 248 +++++++++++++++++++
.../main/java/org/apache/helix/ZNRecord.java | 19 ++
.../java/org/apache/helix/ZNRecordDelta.java | 5 +-
.../org/apache/helix/manager/zk/ZKUtil.java | 31 ++-
.../org/apache/helix/model/ClusterConfig.java | 15 ++
.../integration/ZkIntegrationTestBase.java | 12 +-
.../org/apache/helix/manager/zk/TestZKUtil.java | 113 ++++++---
7 files changed, 401 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/7c92bf54/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
index 34aef49..27a30cb 100644
--- a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
@@ -27,11 +27,15 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ConfigScope;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.manager.zk.ZKUtil;
import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.util.StringTemplate;
import org.apache.log4j.Logger;
@@ -509,4 +513,248 @@ public class ConfigAccessor {
}
return retKeys;
}
+
+ private ZNRecord getConfigZnRecord(HelixConfigScope scope) {
+ String clusterName = scope.getClusterName();
+ if (!ZKUtil.isClusterSetup(clusterName, zkClient)) {
+ throw new HelixException("fail to get configs. cluster " + clusterName + " is not setup yet");
+ }
+
+ return zkClient.readData(scope.getZkPath(), true);
+ }
+
+ /**
+ * Get ClusterConfig of the given cluster.
+ *
+ * @param clusterName
+ *
+ * @return
+ */
+ public ClusterConfig getClusterConfig(String clusterName) {
+ if (!ZKUtil.isClusterSetup(clusterName, zkClient)) {
+ throw new HelixException("fail to get config. cluster: " + clusterName + " is NOT setup.");
+ }
+
+ HelixConfigScope scope =
+ new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(clusterName).build();
+ ZNRecord record = getConfigZnRecord(scope);
+
+ if (record == null) {
+ LOG.warn("No config found at " + scope.getZkPath());
+ return null;
+ }
+
+ return new ClusterConfig(record);
+ }
+
+ /**
+ * Set ClusterConfig of the given cluster.
+ * The current Cluster config will be replaced with the given clusterConfig.
+ * WARNING: This is not thread-safe or concurrent updates safe.
+ *
+ * @param clusterName
+ * @param clusterConfig
+ *
+ * @return
+ */
+ public void setClusterConfig(String clusterName, ClusterConfig clusterConfig) {
+ updateClusterConfig(clusterName, clusterConfig, true);
+ }
+
+ /**
+ * Update ClusterConfig of the given cluster.
+ * The value of field in current config will be replaced with the value of the same field in given config if it
+ * presents. If there is new field in given config but not in current config, the field will be added into
+ * the current config..
+ * The list fields and map fields will be replaced as a single entry.
+ *
+ * The current Cluster config will be replaced with the given clusterConfig.
+ * WARNING: This is not thread-safe or concurrent updates safe.
+ *
+ * @param clusterName
+ * @param clusterConfig
+ *
+ * @return
+ */
+ public void updateClusterConfig(String clusterName, ClusterConfig clusterConfig) {
+ updateClusterConfig(clusterName, clusterConfig, false);
+ }
+
+
+ private void updateClusterConfig(String clusterName, ClusterConfig clusterConfig, boolean overwrite) {
+ if (!ZKUtil.isClusterSetup(clusterName, zkClient)) {
+ throw new HelixException("fail to update config. cluster: " + clusterName + " is NOT setup.");
+ }
+
+ HelixConfigScope scope =
+ new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(clusterName).build();
+ String zkPath = scope.getZkPath();
+
+ if (overwrite) {
+ ZKUtil.createOrReplace(zkClient, zkPath, clusterConfig.getRecord(), true);
+ } else {
+ ZKUtil.createOrUpdate(zkClient, zkPath, clusterConfig.getRecord(), true, true);
+ }
+ }
+
+ /**
+ * Get resource config for given resource in given cluster.
+ *
+ * @param clusterName
+ * @param resourceName
+ *
+ * @return
+ */
+ public ResourceConfig getResourceConfig(String clusterName, String resourceName) {
+ HelixConfigScope scope =
+ new HelixConfigScopeBuilder(ConfigScopeProperty.RESOURCE).forCluster(clusterName)
+ .forResource(resourceName).build();
+ ZNRecord record = getConfigZnRecord(scope);
+
+ if (record == null) {
+ LOG.warn("No config found at " + scope.getZkPath());
+ return null;
+ }
+
+ return new ResourceConfig(record);
+ }
+
+ /**
+ * Set config of the given resource.
+ * The current Resource config will be replaced with the given clusterConfig.
+ *
+ * WARNING: This is not thread-safe or concurrent updates safe.
+ *
+ * @param clusterName
+ * @param resourceName
+ * @param resourceConfig
+ *
+ * @return
+ */
+ public void setResourceConfig(String clusterName, String resourceName,
+ ResourceConfig resourceConfig) {
+ updateResourceConfig(clusterName, resourceName, resourceConfig, true);
+ }
+
+ /**
+ * Update ResourceConfig of the given resource.
+ * The value of field in current config will be replaced with the value of the same field in given config if it
+ * presents. If there is new field in given config but not in current config, the field will be added into
+ * the current config..
+ * The list fields and map fields will be replaced as a single entry.
+ *
+ * The current Cluster config will be replaced with the given clusterConfig.
+ * WARNING: This is not thread-safe or concurrent updates safe.
+ *
+ * @param clusterName
+ * @param resourceName
+ * @param resourceConfig
+ *
+ * @return
+ */
+ public void updateResourceConfig(String clusterName, String resourceName,
+ ResourceConfig resourceConfig) {
+ updateResourceConfig(clusterName, resourceName, resourceConfig, false);
+ }
+
+ private void updateResourceConfig(String clusterName, String resourceName,
+ ResourceConfig resourceConfig, boolean overwrite) {
+ if (!ZKUtil.isClusterSetup(clusterName, zkClient)) {
+ throw new HelixException("fail to setup config. cluster: " + clusterName + " is NOT setup.");
+ }
+
+ HelixConfigScope scope =
+ new HelixConfigScopeBuilder(ConfigScopeProperty.RESOURCE).forCluster(clusterName)
+ .forResource(resourceName).build();
+ String zkPath = scope.getZkPath();
+
+ if (overwrite) {
+ ZKUtil.createOrReplace(zkClient, zkPath, resourceConfig.getRecord(), true);
+ } else {
+ ZKUtil.createOrUpdate(zkClient, zkPath, resourceConfig.getRecord(), true, true);
+ }
+ }
+
+ /**
+ * Get instance config for given resource in given cluster.
+ *
+ * @param clusterName
+ * @param instanceName
+ *
+ * @return
+ */
+ public InstanceConfig getInstanceConfig(String clusterName, String instanceName) {
+ if (!ZKUtil.isInstanceSetup(zkClient, clusterName, instanceName, InstanceType.PARTICIPANT)) {
+ throw new HelixException(
+ "fail to get config. instance: " + instanceName + " is NOT setup in cluster: "
+ + clusterName);
+ }
+
+ HelixConfigScope scope =
+ new HelixConfigScopeBuilder(ConfigScopeProperty.PARTICIPANT).forCluster(clusterName)
+ .forParticipant(instanceName).build();
+ ZNRecord record = getConfigZnRecord(scope);
+
+ if (record == null) {
+ LOG.warn("No config found at " + scope.getZkPath());
+ return null;
+ }
+
+ return new InstanceConfig(record);
+ }
+
+ /**
+ * Set config of the given instance config.
+ * The current instance config will be replaced with the given instanceConfig.
+ * WARNING: This is not thread-safe or concurrent updates safe.
+ *
+ * @param clusterName
+ * @param instanceName
+ * @param instanceConfig
+ *
+ * @return
+ */
+ public void setInstanceConfig(String clusterName, String instanceName,
+ InstanceConfig instanceConfig) {
+ updateInstanceConfig(clusterName, instanceName, instanceConfig, true);
+
+ }
+
+ /**
+ * Update ResourceConfig of the given resource. The value of field in current config will be
+ * replaced with the value of the same field in given config if it presents. If there is new field
+ * in given config but not in current config, the field will be added into the current config..
+ * The list fields and map fields will be replaced as a single entry.
+ * The current Cluster config will be replaced with the given clusterConfig. WARNING: This is not
+ * thread-safe or concurrent updates safe.
+ * *
+ *
+ * @param clusterName
+ * @param instanceName
+ * @param instanceConfig
+ *
+ * @return
+ */
+ public void updateInstanceConfig(String clusterName, String instanceName,
+ InstanceConfig instanceConfig) {
+ updateInstanceConfig(clusterName, instanceName, instanceConfig, false);
+ }
+
+ private void updateInstanceConfig(String clusterName, String instanceName,
+ InstanceConfig instanceConfig, boolean overwrite) {
+ if (!ZKUtil.isClusterSetup(clusterName, zkClient)) {
+ throw new HelixException("fail to setup config. cluster: " + clusterName + " is NOT setup.");
+ }
+
+ HelixConfigScope scope =
+ new HelixConfigScopeBuilder(ConfigScopeProperty.PARTICIPANT).forCluster(clusterName)
+ .forParticipant(instanceName).build();
+ String zkPath = scope.getZkPath();
+
+ if (overwrite) {
+ ZKUtil.createOrReplace(zkClient, zkPath, instanceConfig.getRecord(), true);
+ } else {
+ ZKUtil.createOrUpdate(zkClient, zkPath, instanceConfig.getRecord(), true, true);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7c92bf54/helix-core/src/main/java/org/apache/helix/ZNRecord.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ZNRecord.java b/helix-core/src/main/java/org/apache/helix/ZNRecord.java
index 3ac9485..52bd5be 100644
--- a/helix-core/src/main/java/org/apache/helix/ZNRecord.java
+++ b/helix-core/src/main/java/org/apache/helix/ZNRecord.java
@@ -512,7 +512,24 @@ public class ZNRecord {
}
/**
+ * Replace functionality is used to update this ZNRecord with the given ZNRecord. The value of a
+ * field in this record will be replaced with the value of the same field in given record if it
+ * presents. If there is new field in given ZNRecord but not in this record, add that field into
+ * this record. The list fields and map fields will be replaced as a single entry.
+ *
+ * @param record
+ */
+ public void update(ZNRecord record) {
+ if (record != null) {
+ simpleFields.putAll(record.simpleFields);
+ listFields.putAll(record.listFields);
+ mapFields.putAll(record.mapFields);
+ }
+ }
+
+ /**
* Merge in a {@link ZNRecordDelta} corresponding to its merge policy
+ *
* @param delta
*/
void merge(ZNRecordDelta delta) {
@@ -520,6 +537,8 @@ public class ZNRecord {
merge(delta.getRecord());
} else if (delta.getMergeOperation() == MergeOperation.SUBTRACT) {
subtract(delta.getRecord());
+ } else if (delta.getMergeOperation() == MergeOperation.UPDATE) {
+ update(delta.getRecord());
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7c92bf54/helix-core/src/main/java/org/apache/helix/ZNRecordDelta.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ZNRecordDelta.java b/helix-core/src/main/java/org/apache/helix/ZNRecordDelta.java
index eff725d..616e1f5 100644
--- a/helix-core/src/main/java/org/apache/helix/ZNRecordDelta.java
+++ b/helix-core/src/main/java/org/apache/helix/ZNRecordDelta.java
@@ -28,7 +28,8 @@ public class ZNRecordDelta {
*/
public enum MergeOperation {
ADD,
- SUBTRACT
+ SUBTRACT,
+ UPDATE
};
/**
@@ -44,7 +45,7 @@ public class ZNRecordDelta {
/**
* Initialize the delta with a record and the update mode
* @param record
- * @param _mergeOperation
+ * @param mergeOperation
*/
public ZNRecordDelta(ZNRecord record, MergeOperation mergeOperation) {
_record = new ZNRecord(record);
http://git-wip-us.apache.org/repos/asf/helix/blob/7c92bf54/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
index 9131e5b..2b4cfb2 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
@@ -196,7 +196,7 @@ public final class ZKUtil {
}
}
- public static void createOrUpdate(ZkClient client, String path, final ZNRecord record,
+ public static void createOrMerge(ZkClient client, String path, final ZNRecord record,
final boolean persistent, final boolean mergeOnUpdate) {
int retryCount = 0;
while (retryCount < RETRYLIMIT) {
@@ -232,6 +232,35 @@ public final class ZKUtil {
}
}
+ public static void createOrUpdate(ZkClient client, String path, final ZNRecord record,
+ final boolean persistent, final boolean mergeOnUpdate) {
+ int retryCount = 0;
+ while (retryCount < RETRYLIMIT) {
+ try {
+ if (client.exists(path)) {
+ DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
+ @Override public ZNRecord update(ZNRecord currentData) {
+ if (currentData != null && mergeOnUpdate) {
+ currentData.update(record);
+ return currentData;
+ }
+ return record;
+ }
+ };
+ client.updateDataSerialized(path, updater);
+ } else {
+ CreateMode mode = (persistent) ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;
+ client.create(path, record, mode);
+ }
+ break;
+ } catch (Exception e) {
+ retryCount = retryCount + 1;
+ logger.warn("Exception trying to update " + path + " Exception:" + e.getMessage()
+ + ". Will retry.");
+ }
+ }
+ }
+
public static void asyncCreateOrUpdate(ZkClient client, String path, final ZNRecord record,
final boolean persistent, final boolean mergeOnUpdate) {
try {
http://git-wip-us.apache.org/repos/asf/helix/blob/7c92bf54/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 2be7ee1..3796f36 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -68,6 +68,21 @@ public class ClusterConfig extends HelixProperty {
}
/**
+ * Enable/Disable persist best possible assignment in a resource's idealstate.
+ *
+ * @return
+ */
+ public void setPersistBestPossibleAssignment(Boolean enable) {
+ if (enable == null) {
+ _record.getSimpleFields()
+ .remove(ClusterConfigProperty.PERSIST_BEST_POSSIBLE_ASSIGNMENT.toString());
+ } else {
+ _record.setBooleanField(ClusterConfigProperty.PERSIST_BEST_POSSIBLE_ASSIGNMENT.toString(),
+ enable);
+ }
+ }
+
+ /**
*
* @return
*/
http://git-wip-us.apache.org/repos/asf/helix/blob/7c92bf54/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
index 0edd4d3..08a5730 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
@@ -104,15 +104,11 @@ public class ZkIntegrationTestBase {
}
protected void enablePersistBestPossibleAssignment(ZkClient zkClient, String clusterName,
- Boolean enable) {
+ Boolean enabled) {
ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
- HelixConfigScope clusterScope =
- new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
- .forCluster(clusterName).build();
-
- configAccessor.set(clusterScope,
- ClusterConfig.ClusterConfigProperty.PERSIST_BEST_POSSIBLE_ASSIGNMENT.name(),
- enable.toString());
+ ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+ clusterConfig.setPersistBestPossibleAssignment(enabled);
+ configAccessor.setClusterConfig(clusterName, clusterConfig);
}
protected void disableDelayRebalanceInCluster(ZkClient zkClient, String clusterName,
http://git-wip-us.apache.org/repos/asf/helix/blob/7c92bf54/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKUtil.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKUtil.java
index ed604fa..19d30a6 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKUtil.java
@@ -19,16 +19,15 @@ package org.apache.helix.manager.zk;
* under the License.
*/
-import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
-
+import java.util.Map;
import org.apache.helix.PropertyPathBuilder;
-import org.apache.helix.PropertyType;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.ZkUnitTestBase;
-import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.log4j.Logger;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
@@ -42,7 +41,7 @@ public class TestZKUtil extends ZkUnitTestBase {
ZkClient _zkClient;
@BeforeClass()
- public void beforeClass() throws IOException, Exception {
+ public void beforeClass() throws Exception {
_zkClient = new ZkClient(ZK_ADDR);
_zkClient.setZkSerializer(new ZNRecordSerializer());
if (_zkClient.exists("/" + clusterName)) {
@@ -79,9 +78,7 @@ public class TestZKUtil extends ZkUnitTestBase {
List<ZNRecord> list = new ArrayList<ZNRecord>();
list.add(new ZNRecord("id1"));
list.add(new ZNRecord("id2"));
- String path =
- PropertyPathBuilder.getPath(PropertyType.CONFIGS, clusterName,
- ConfigScopeProperty.PARTICIPANT.toString());
+ String path = PropertyPathBuilder.instanceConfig(clusterName);
ZKUtil.createChildren(_zkClient, path, list);
list = ZKUtil.getChildren(_zkClient, path);
AssertJUnit.assertEquals(2, list.size());
@@ -96,63 +93,117 @@ public class TestZKUtil extends ZkUnitTestBase {
@Test()
public void testUpdateIfExists() {
- String path =
- PropertyPathBuilder.getPath(PropertyType.CONFIGS, clusterName,
- ConfigScopeProperty.PARTICIPANT.toString(), "id3");
+ String path = PropertyPathBuilder.instanceConfig(clusterName, "id3");
ZNRecord record = new ZNRecord("id4");
ZKUtil.updateIfExists(_zkClient, path, record, false);
AssertJUnit.assertFalse(_zkClient.exists(path));
_zkClient.createPersistent(path);
ZKUtil.updateIfExists(_zkClient, path, record, false);
AssertJUnit.assertTrue(_zkClient.exists(path));
- record = _zkClient.<ZNRecord> readData(path);
+ record = _zkClient.readData(path);
AssertJUnit.assertEquals("id4", record.getId());
}
@Test()
public void testSubtract() {
- String path =
- PropertyPathBuilder.getPath(PropertyType.CONFIGS, clusterName,
- ConfigScopeProperty.PARTICIPANT.toString(), "id5");
+ String path = PropertyPathBuilder.instanceConfig(clusterName, "id5");
ZNRecord record = new ZNRecord("id5");
record.setSimpleField("key1", "value1");
_zkClient.createPersistent(path, record);
ZKUtil.subtract(_zkClient, path, record);
- record = _zkClient.<ZNRecord> readData(path);
+ record = _zkClient.readData(path);
AssertJUnit.assertNull(record.getSimpleField("key1"));
}
@Test()
public void testNullChildren() {
- String path =
- PropertyPathBuilder.getPath(PropertyType.CONFIGS, clusterName,
- ConfigScopeProperty.PARTICIPANT.toString(), "id6");
+ String path = PropertyPathBuilder.instanceConfig(clusterName, "id6");
ZKUtil.createChildren(_zkClient, path, (List<ZNRecord>) null);
}
@Test()
- public void testCreateOrUpdate() {
- String path =
- PropertyPathBuilder.getPath(PropertyType.CONFIGS, clusterName,
- ConfigScopeProperty.PARTICIPANT.toString(), "id7");
+ public void testCreateOrMerge() {
+ String path = PropertyPathBuilder.instanceConfig(clusterName, "id7");
ZNRecord record = new ZNRecord("id7");
- ZKUtil.createOrUpdate(_zkClient, path, record, true, true);
- record = _zkClient.<ZNRecord> readData(path);
- AssertJUnit.assertEquals("id7", record.getId());
+ List<String> list = Arrays.asList("value1");
+ record.setListField("list", list);
+ ZKUtil.createOrMerge(_zkClient, path, record, true, true);
+ record = _zkClient.readData(path);
+ AssertJUnit.assertEquals(list, record.getListField("list"));
+
+ record = new ZNRecord("id7");
+ List<String> list2 = Arrays.asList("value2");
+ record.setListField("list", list2);
+ ZKUtil.createOrMerge(_zkClient, path, record, true, true);
+ record = _zkClient.readData(path);
+ AssertJUnit.assertEquals(Arrays.asList("value1", "value2"), record.getListField("list"));
+
+ Map<String, String> map = new HashMap<String, String>() {{put("k1", "v1");}};
+ record.setMapField("map", map);
+ ZKUtil.createOrMerge(_zkClient, path, record, true, true);
+ record = _zkClient.readData(path);
+ AssertJUnit.assertEquals(map, record.getMapField("map"));
+
+ record = new ZNRecord("id7");
+ Map<String, String> map2 = new HashMap<String, String>() {{put("k2", "v2");}};
+ record.setMapField("map", map2);
+ ZKUtil.createOrMerge(_zkClient, path, record, true, true);
+ record = _zkClient.readData(path);
+ AssertJUnit.assertEquals(new HashMap<String, String>() {{
+ put("k1", "v1");
+ put("k2", "v2");
+ }}, record.getMapField("map"));
}
@Test()
public void testCreateOrReplace() {
- String path =
- PropertyPathBuilder.getPath(PropertyType.CONFIGS, clusterName,
- ConfigScopeProperty.PARTICIPANT.toString(), "id8");
+ String path = PropertyPathBuilder.instanceConfig(clusterName, "id8");
ZNRecord record = new ZNRecord("id8");
ZKUtil.createOrReplace(_zkClient, path, record, true);
- record = _zkClient.<ZNRecord> readData(path);
+ record = _zkClient.readData(path);
AssertJUnit.assertEquals("id8", record.getId());
record = new ZNRecord("id9");
ZKUtil.createOrReplace(_zkClient, path, record, true);
- record = _zkClient.<ZNRecord> readData(path);
+ record = _zkClient.readData(path);
AssertJUnit.assertEquals("id9", record.getId());
}
+
+ @Test()
+ public void testCreateOrUpdate() {
+ String path = PropertyPathBuilder.instanceConfig(clusterName, "id7");
+ ZNRecord record = new ZNRecord("id7");
+ ZKUtil.createOrMerge(_zkClient, path, record, true, true);
+ record = _zkClient.readData(path);
+ AssertJUnit.assertEquals("id7", record.getId());
+
+ record = new ZNRecord("id7");
+ List<String> list = Arrays.asList("value1", "value2");
+ record.setListField("list", list);
+ ZKUtil.createOrUpdate(_zkClient, path, record, true, true);
+ record = _zkClient.readData(path);
+ AssertJUnit.assertEquals(list, record.getListField("list"));
+
+ record = new ZNRecord("id7");
+ List<String> list2 = Arrays.asList("value3", "value4");
+ record.setListField("list", list2);
+ ZKUtil.createOrUpdate(_zkClient, path, record, true, true);
+ record = _zkClient.readData(path);
+ AssertJUnit.assertEquals(list2, record.getListField("list"));
+
+
+ Map<String, String> map = new HashMap<String, String>() {{put("k1", "v1");}};
+ record.setMapField("map", map);
+ ZKUtil.createOrUpdate(_zkClient, path, record, true, true);
+ record = _zkClient.readData(path);
+ AssertJUnit.assertEquals(map, record.getMapField("map"));
+
+ record = new ZNRecord("id7");
+ Map<String, String> map2 = new HashMap<String, String>() {{put("k2", "v2");}};
+ record.setMapField("map", map2);
+ ZKUtil.createOrUpdate(_zkClient, path, record, true, true);
+ record = _zkClient.readData(path);
+ AssertJUnit.assertEquals(new HashMap<String, String>() {{
+ put("k2", "v2");
+ }}, record.getMapField("map"));
+ }
}
[2/2] helix git commit: Allow user to enable persisting preference
list and best possible state map into IdealState in full-auto mode.
Posted by jx...@apache.org.
Allow user to enable persisting preference list and best possible state map into IdealState in full-auto mode.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/8ba068e7
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/8ba068e7
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/8ba068e7
Branch: refs/heads/helix-0.6.x
Commit: 8ba068e7b78aedf4743f2da57670384534d1d4f8
Parents: 7c92bf5
Author: Lei Xia <lx...@linkedin.com>
Authored: Tue May 23 13:58:24 2017 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Tue May 23 13:58:24 2017 -0700
----------------------------------------------------------------------
.../stages/BestPossibleStateCalcStage.java | 3 +
.../stages/BestPossibleStateOutput.java | 42 +++++
.../stages/PersistAssignmentStage.java | 172 ++++++++++---------
.../java/org/apache/helix/model/IdealState.java | 65 +++++--
.../TestRebalancerPersistAssignments.java | 126 +++++---------
5 files changed, 227 insertions(+), 181 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/8ba068e7/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index cba0659..526f532 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -127,6 +127,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
rebalancer.init(manager);
idealState =
rebalancer.computeNewIdealState(resourceName, idealState, currentStateOutput, cache);
+ output.setPreferenceLists(resourceName, idealState.getPreferenceLists());
// Use the internal MappingCalculator interface to compute the final assignment
// The next release will support rebalancers that compute the mapping from start to finish
@@ -180,6 +181,8 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
rebalancer = customizedRebalancer;
break;
default:
+ logger.error(
+ "Fail to find the rebalancer, invalid rebalance mode " + idealState.getRebalanceMode());
break;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/8ba068e7/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
index 168a3b0..a3ad56d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
@@ -21,6 +21,7 @@ package org.apache.helix.controller.stages;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -29,6 +30,8 @@ import org.apache.helix.model.Partition;
public class BestPossibleStateOutput {
// Map of resource->partition->instance->state
Map<String, Map<Partition, Map<String, String>>> _stateMap;
+ /* resource -> partition -> preference list */
+ private Map<String, Map<String, List<String>>> _preferenceLists;
public BestPossibleStateOutput() {
_stateMap = new HashMap<String, Map<Partition, Map<String, String>>>();
@@ -77,6 +80,45 @@ public class BestPossibleStateOutput {
return _stateMap;
}
+ public Map<String, Map<String, List<String>>> getPreferenceLists() {
+ return _preferenceLists;
+ }
+
+ public Map<String, List<String>> getPreferenceLists(String resource) {
+ if (_preferenceLists != null && _preferenceLists.containsKey(resource)) {
+ return _preferenceLists.get(resource);
+ }
+
+ return null;
+ }
+
+ public List<String> getPreferenceList(String resource, String partition) {
+ if (_preferenceLists != null && _preferenceLists.containsKey(resource) && _preferenceLists
+ .get(resource).containsKey(partition)) {
+ return _preferenceLists.get(resource).get(partition);
+ }
+
+ return null;
+ }
+
+ public void setPreferenceList(String resource, String partition, List<String> list) {
+ if (_preferenceLists == null) {
+ _preferenceLists = new HashMap<String, Map<String, List<String>>>();
+ }
+ if (!_preferenceLists.containsKey(resource)) {
+ _preferenceLists.put(resource, new HashMap<String, List<String>>());
+ }
+ _preferenceLists.get(resource).put(partition, list);
+ }
+
+ public void setPreferenceLists(String resource,
+ Map<String, List<String>> resourcePreferenceLists) {
+ if (_preferenceLists == null) {
+ _preferenceLists = new HashMap<String, Map<String, List<String>>>();
+ }
+ _preferenceLists.put(resource, resourcePreferenceLists);
+ }
+
@Override
public String toString() {
return _stateMap.toString();
http://git-wip-us.apache.org/repos/asf/helix/blob/8ba068e7/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
index 9c297f8..425b38b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
@@ -19,19 +19,16 @@ package org.apache.helix.controller.stages;
* under the License.
*/
-import java.util.Collections;
-import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
-
+import java.util.Set;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
-import org.apache.helix.model.MasterSlaveSMD;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.log4j.Logger;
@@ -49,56 +46,58 @@ public class PersistAssignmentStage extends AbstractBaseStage {
ClusterDataCache cache = event.getAttribute("ClusterDataCache");
ClusterConfig clusterConfig = cache.getClusterConfig();
- if (clusterConfig.isPersistBestPossibleAssignment()) {
- HelixManager helixManager = event.getAttribute("helixmanager");
- HelixDataAccessor accessor = helixManager.getHelixDataAccessor();
- PropertyKey.Builder keyBuilder = accessor.keyBuilder();
- BestPossibleStateOutput bestPossibleAssignments =
- event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
- Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
-
- for (String resourceId : bestPossibleAssignments.resourceSet()) {
- Resource resource = resourceMap.get(resourceId);
- if (resource != null) {
- boolean changed = false;
- Map<Partition, Map<String, String>> bestPossibleAssignment =
- bestPossibleAssignments.getResourceMap(resourceId);
- IdealState idealState = cache.getIdealState(resourceId);
- if (idealState == null) {
- LOG.warn("IdealState not found for resource " + resourceId);
- continue;
- }
- IdealState.RebalanceMode mode = idealState.getRebalanceMode();
- if (!mode.equals(IdealState.RebalanceMode.SEMI_AUTO) && !mode
- .equals(IdealState.RebalanceMode.FULL_AUTO)) {
- // do not persist assignment for resource in neither semi or full auto.
- continue;
- }
+ if (!clusterConfig.isPersistBestPossibleAssignment()) {
+ return;
+ }
- //TODO: temporary solution for Espresso/Dbus backcompatible, should remove this.
- Map<Partition, Map<String, String>> assignmentToPersist =
- convertAssignmentPersisted(resource, idealState, bestPossibleAssignment);
-
- for (Partition partition : resource.getPartitions()) {
- Map<String, String> instanceMap = assignmentToPersist.get(partition);
- Map<String, String> existInstanceMap =
- idealState.getInstanceStateMap(partition.getPartitionName());
- if (instanceMap == null && existInstanceMap == null) {
- continue;
- }
- if (instanceMap == null || existInstanceMap == null || !instanceMap
- .equals(existInstanceMap)) {
- changed = true;
- break;
- }
+ BestPossibleStateOutput bestPossibleAssignment =
+ event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
+
+ HelixManager helixManager = event.getAttribute("helixmanager");
+ HelixDataAccessor accessor = helixManager.getHelixDataAccessor();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
+
+ for (String resourceId : bestPossibleAssignment.resourceSet()) {
+ Resource resource = resourceMap.get(resourceId);
+ if (resource != null) {
+ final IdealState idealState = cache.getIdealState(resourceId);
+ if (idealState == null) {
+ LOG.warn("IdealState not found for resource " + resourceId);
+ continue;
+ }
+ IdealState.RebalanceMode mode = idealState.getRebalanceMode();
+ if (!mode.equals(IdealState.RebalanceMode.SEMI_AUTO) && !mode
+ .equals(IdealState.RebalanceMode.FULL_AUTO)) {
+ // do not persist assignment for resource in neither semi or full auto.
+ continue;
+ }
+
+ boolean needPersist = false;
+ if (mode.equals(IdealState.RebalanceMode.FULL_AUTO)) {
+ // persist preference list in ful-auto mode.
+ Map<String, List<String>> newLists =
+ bestPossibleAssignment.getPreferenceLists(resourceId);
+ if (newLists != null && hasPreferenceListChanged(newLists, idealState)) {
+ idealState.setPreferenceLists(newLists);
+ needPersist = true;
}
- if (changed) {
- for (Partition partition : assignmentToPersist.keySet()) {
- Map<String, String> instanceMap = assignmentToPersist.get(partition);
- idealState.setInstanceStateMap(partition.getPartitionName(), instanceMap);
- }
- accessor.setProperty(keyBuilder.idealStates(resourceId), idealState);
+ }
+
+ Map<Partition, Map<String, String>> bestPossibleAssignements =
+ bestPossibleAssignment.getResourceMap(resourceId);
+
+ if (bestPossibleAssignements != null && hasInstanceMapChanged(bestPossibleAssignements,
+ idealState)) {
+ for (Partition partition : bestPossibleAssignements.keySet()) {
+ Map<String, String> instanceMap = bestPossibleAssignements.get(partition);
+ idealState.setInstanceStateMap(partition.getPartitionName(), instanceMap);
}
+ needPersist = true;
+ }
+
+ if (needPersist) {
+ accessor.setProperty(keyBuilder.idealStates(resourceId), idealState);
}
}
}
@@ -108,47 +107,50 @@ public class PersistAssignmentStage extends AbstractBaseStage {
}
/**
- * TODO: This is a temporary hacky for back-compatible support of Espresso and Databus,
- * we should get rid of this conversion as soon as possible.
- * --- Lei, 2016/9/9.
+ * has the preference list changed from the one persisted in current IdealState
*/
- private Map<Partition, Map<String, String>> convertAssignmentPersisted(Resource resource,
- IdealState idealState, Map<Partition, Map<String, String>> bestPossibleAssignment) {
- String stateModelDef = idealState.getStateModelDefRef();
- /** Only convert for MasterSlave resources */
- if (!stateModelDef.equals(BuiltInStateModelDefinitions.MasterSlave.name())) {
- return bestPossibleAssignment;
+ private boolean hasPreferenceListChanged(Map<String, List<String>> newLists,
+ IdealState idealState) {
+ Map<String, List<String>> existLists = idealState.getPreferenceLists();
+
+ Set<String> partitions = new HashSet<String>(newLists.keySet());
+ partitions.addAll(existLists.keySet());
+
+ for (String partition : partitions) {
+ List<String> assignedInstances = newLists.get(partition);
+ List<String> existingInstances = existLists.get(partition);
+ if (assignedInstances == null && existingInstances == null) {
+ continue;
+ }
+ if (assignedInstances == null || existingInstances == null || !assignedInstances
+ .equals(existingInstances)) {
+ return true;
+ }
}
- Map<Partition, Map<String, String>> assignmentToPersist =
- new HashMap<Partition, Map<String, String>>();
-
- for (Partition partition : resource.getPartitions()) {
- Map<String, String> instanceMap = new HashMap<String, String>();
- instanceMap.putAll(bestPossibleAssignment.get(partition));
+ return false;
+ }
- List<String> preferenceList = idealState.getPreferenceList(partition.getPartitionName());
- boolean hasMaster = false;
- for (String ins : preferenceList) {
- String state = instanceMap.get(ins);
- if (state == null || (!state.equals(MasterSlaveSMD.States.SLAVE.name()) && !state
- .equals(MasterSlaveSMD.States.MASTER.name()))) {
- instanceMap.put(ins, MasterSlaveSMD.States.SLAVE.name());
- }
+ private boolean hasInstanceMapChanged(Map<Partition, Map<String, String>> newAssiments,
+ IdealState idealState) {
+ Set<Partition> partitions = new HashSet<Partition>(newAssiments.keySet());
+ for (String p : idealState.getPartitionSet()) {
+ partitions.add(new Partition(p));
+ }
- if (state != null && state.equals(MasterSlaveSMD.States.MASTER.name())) {
- hasMaster = true;
- }
+ for (Partition partition : partitions) {
+ Map<String, String> instanceMap = newAssiments.get(partition);
+ Map<String, String> existInstanceMap =
+ idealState.getInstanceStateMap(partition.getPartitionName());
+ if (instanceMap == null && existInstanceMap == null) {
+ continue;
}
-
- // if no master, just pick the first node in the preference list as the master.
- if (!hasMaster && preferenceList.size() > 0) {
- instanceMap.put(preferenceList.get(0), MasterSlaveSMD.States.MASTER.name());
+ if (instanceMap == null || existInstanceMap == null || !instanceMap
+ .equals(existInstanceMap)) {
+ return true;
}
-
- assignmentToPersist.put(partition, instanceMap);
}
- return assignmentToPersist;
+ return false;
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/8ba068e7/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 907bd27..48e43d6 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -352,6 +352,9 @@ public class IdealState extends HelixProperty {
/**
* Get the current mapping of a partition
+ * CAUTION: In FULL-AUTO mode, this method could return empty map if
+ * {@link ClusterConfig#setPersistBestPossibleAssignment(Boolean)} is set to true.
+ *
* @param partitionName the name of the partition
* @return the instances where the replicas live and the state of each
*/
@@ -371,37 +374,75 @@ public class IdealState extends HelixProperty {
}
/**
- * Get the instances who host replicas of a partition
+ * Get the instances who host replicas of a partition.
+ * CAUTION: In FULL-AUTO mode, this method could return empty map if
+ * {@link ClusterConfig#setPersistBestPossibleAssignment(Boolean)} is set to true.
+ +
* @param partitionName the partition to look up
* @return set of instance names
*/
public Set<String> getInstanceSet(String partitionName) {
- if (getRebalanceMode() == RebalanceMode.SEMI_AUTO
- || getRebalanceMode() == RebalanceMode.FULL_AUTO
- || getRebalanceMode() == RebalanceMode.USER_DEFINED
- || getRebalanceMode() == RebalanceMode.TASK) {
+ switch (getRebalanceMode()) {
+ case FULL_AUTO:
+ case SEMI_AUTO:
+ case USER_DEFINED:
+ case TASK:
List<String> prefList = _record.getListField(partitionName);
- if (prefList != null) {
+ if (prefList != null && !prefList.isEmpty()) {
return new TreeSet<String>(prefList);
} else {
- logger.warn(partitionName + " does NOT exist");
- return Collections.emptySet();
+ Map<String, String> stateMap = _record.getMapField(partitionName);
+ if (stateMap != null && !stateMap.isEmpty()) {
+ return new TreeSet<String>(stateMap.keySet());
+ } else {
+ logger.warn(partitionName + " does NOT exist");
+ }
}
- } else if (getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
+ break;
+ case CUSTOMIZED:
Map<String, String> stateMap = _record.getMapField(partitionName);
if (stateMap != null) {
return new TreeSet<String>(stateMap.keySet());
} else {
logger.warn(partitionName + " does NOT exist");
- return Collections.emptySet();
}
- } else {
+ break;
+ case NONE:
+ default:
logger.error("Invalid ideal state mode: " + getResourceName());
- return Collections.emptySet();
+ break;
}
+ return Collections.emptySet();
+ }
+
+ /** Set the preference list of a partition
+ * @param partitionName the name of the partition
+ * @param instanceList the instance preference list
+ */
+ public void setPreferenceList(String partitionName, List<String> instanceList) {
+ _record.setListField(partitionName, instanceList);
+ }
+
+ /**
+ * Set the preference lists for all partitions in this resource.
+ *
+ * @param instanceLists the map of instance preference lists.
+ */
+ public void setPreferenceLists(Map<String, List<String>> instanceLists) {
+ _record.setListFields(instanceLists);
+ }
+
+ /**
+ * Get the preference lists for all partitions
+ *
+ * @return map of lists of instances for all partitions in this resource.
+ */
+ public Map<String, List<String>> getPreferenceLists() {
+ return _record.getListFields();
}
+
/**
* Get the preference list of a partition
* @param partitionName the name of the partition
http://git-wip-us.apache.org/repos/asf/helix/blob/8ba068e7/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java b/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java
index 3aec847..2a9dc69 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java
@@ -24,11 +24,8 @@ import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.MasterSlaveSMD;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
-import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier;
-import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
@@ -42,6 +39,8 @@ import java.util.Map;
import java.util.Set;
public class TestRebalancerPersistAssignments extends ZkStandAloneCMTestBase {
+ Set<String> _instanceNames = new HashSet<String>();
+
@Override
@BeforeClass
public void beforeClass() throws Exception {
@@ -69,13 +68,14 @@ public class TestRebalancerPersistAssignments extends ZkStandAloneCMTestBase {
// start dummy participants
for (int i = 0; i < NODE_NR; i++) {
String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ _instanceNames.add(instanceName);
_participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
_participants[i].syncStart();
}
}
@DataProvider(name = "rebalanceModes")
- public static RebalanceMode [][] rebalanceModes() {
+ public static Object [][] rebalanceModes() {
return new RebalanceMode[][] { {RebalanceMode.SEMI_AUTO}, {RebalanceMode.FULL_AUTO}};
}
@@ -88,23 +88,25 @@ public class TestRebalancerPersistAssignments extends ZkStandAloneCMTestBase {
BuiltInStateModelDefinitions.LeaderStandby.name(), rebalanceMode.name());
_setupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3);
- HelixClusterVerifier verifier =
+ BestPossibleExternalViewVerifier.Builder verifierBuilder =
new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
- .setResources(new HashSet<String>(Collections.singleton(testDb))).build();
- Thread.sleep(500);
- Assert.assertTrue(verifier.verify());
+ .setResources(new HashSet<String>(Collections.singleton(testDb)));
+
+ Assert.assertTrue(verifierBuilder.build().verify());
// kill 1 node
_participants[0].syncStop();
- Assert.assertTrue(verifier.verify());
+ Set<String> liveInstances = new HashSet<String>(_instanceNames);
+ liveInstances.remove(_participants[0].getInstanceName());
+ verifierBuilder.setExpectLiveInstances(liveInstances);
+ Assert.assertTrue(verifierBuilder.build().verify());
IdealState idealState =
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb);
Set<String> excludedInstances = new HashSet<String>();
excludedInstances.add(_participants[0].getInstanceName());
- Thread.sleep(2000);
verifyAssignmentInIdealStateWithPersistDisabled(idealState, excludedInstances);
// clean up
@@ -124,10 +126,11 @@ public class TestRebalancerPersistAssignments extends ZkStandAloneCMTestBase {
BuiltInStateModelDefinitions.LeaderStandby.name(), rebalanceMode.name());
_setupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3);
- HelixClusterVerifier verifier =
+ BestPossibleExternalViewVerifier.Builder verifierBuilder =
new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
- .setResources(new HashSet<String>(Collections.singleton(testDb))).build();
- Assert.assertTrue(verifier.verify());
+ .setResources(new HashSet<String>(Collections.singleton(testDb)));
+
+ Assert.assertTrue(verifierBuilder.build().verify());
IdealState idealState =
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb);
@@ -136,9 +139,10 @@ public class TestRebalancerPersistAssignments extends ZkStandAloneCMTestBase {
// kill 1 node
_participants[0].syncStop();
- Boolean result = ClusterStateVerifier.verifyByZkCallback(
- new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
- Assert.assertTrue(result);
+ Set<String> liveInstances = new HashSet<String>(_instanceNames);
+ liveInstances.remove(_participants[0].getInstanceName());
+ verifierBuilder.setExpectLiveInstances(liveInstances);
+ Assert.assertTrue(verifierBuilder.build().verify());
idealState = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb);
// verify that IdealState contains updated assignment in it map fields.
@@ -154,72 +158,8 @@ public class TestRebalancerPersistAssignments extends ZkStandAloneCMTestBase {
_participants[0].syncStart();
}
- /**
- * This test is to test the temporary solution for solving Espresso/Databus back-compatible map format issue.
- *
- * @throws Exception
- */
- @Test(dependsOnMethods = { "testDisablePersist" })
- public void testSemiAutoEnablePersistMasterSlave() throws Exception {
- String testDb = "TestDB1-MasterSlave";
- enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
-
- _setupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5, BuiltInStateModelDefinitions.MasterSlave.name(),
- RebalanceMode.SEMI_AUTO.name());
- _setupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3);
-
- HelixClusterVerifier verifier =
- new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
- .setResources(new HashSet<String>(Collections.singleton(testDb))).build();
- Assert.assertTrue(verifier.verify());
-
- IdealState idealState =
- _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb);
- verifySemiAutoMasterSlaveAssignment(idealState);
-
- // kill 1 node
- _participants[0].syncStop();
-
- Assert.assertTrue(verifier.verify());
-
- idealState = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb);
- verifySemiAutoMasterSlaveAssignment(idealState);
-
- // disable an instance
- _setupTool.getClusterManagementTool()
- .enableInstance(CLUSTER_NAME, _participants[1].getInstanceName(), false);
- idealState = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb);
- verifySemiAutoMasterSlaveAssignment(idealState);
-
- // clean up
- _setupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb);
- _setupTool.getClusterManagementTool()
- .enableInstance(CLUSTER_NAME, _participants[1].getInstanceName(), true);
- _participants[0].reset();
- _participants[0].syncStart();
- }
-
- private void verifySemiAutoMasterSlaveAssignment(IdealState idealState) {
- for (String partition : idealState.getPartitionSet()) {
- Map<String, String> instanceStateMap = idealState.getInstanceStateMap(partition);
- List<String> preferenceList = idealState.getPreferenceList(partition);
- int numMaster = 0;
-
- for (String ins : preferenceList) {
- Assert.assertTrue(instanceStateMap.containsKey(ins));
- String state = instanceStateMap.get(ins);
- Assert.assertTrue(state.equals(MasterSlaveSMD.States.MASTER.name()) || state
- .equals(MasterSlaveSMD.States.SLAVE.name()));
- if (state.equals(MasterSlaveSMD.States.MASTER.name())) {
- numMaster++;
- }
- }
-
- Assert.assertEquals(numMaster, 1);
- }
- }
-
- // verify that the disabled or failed instance should not be included in bestPossible assignment.
+ // verify that both list field and map field should be persisted in IS,
+ // And the disabled or failed instance should not be included in bestPossible assignment.
private void verifyAssignmentInIdealStateWithPersistEnabled(IdealState idealState,
Set<String> excludedInstances) {
for (String partition : idealState.getPartitionSet()) {
@@ -228,8 +168,20 @@ public class TestRebalancerPersistAssignments extends ZkStandAloneCMTestBase {
Assert.assertFalse(instanceStateMap.isEmpty());
Set<String> instancesInMap = instanceStateMap.keySet();
- Set<String> instanceInList = idealState.getInstanceSet(partition);
- Assert.assertTrue(instanceInList.containsAll(instancesInMap));
+ if (idealState.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
+ Set<String> instanceInList = idealState.getInstanceSet(partition);
+ Assert.assertTrue(instanceInList.containsAll(instancesInMap));
+ }
+
+ if(idealState.getRebalanceMode() == RebalanceMode.FULL_AUTO) {
+ // preference list should be persisted in IS.
+ List<String> instanceList = idealState.getPreferenceList(partition);
+ Assert.assertNotNull(instanceList);
+ Assert.assertFalse(instanceList.isEmpty());
+ for (String ins : excludedInstances) {
+ Assert.assertFalse(instanceList.contains(ins));
+ }
+ }
for (String ins : excludedInstances) {
Assert.assertFalse(instancesInMap.contains(ins));
@@ -254,6 +206,12 @@ public class TestRebalancerPersistAssignments extends ZkStandAloneCMTestBase {
// if at least one excluded instance is included, it means assignment was not updated.
assignmentNotChanged = true;
}
+ if(idealState.getRebalanceMode() == RebalanceMode.FULL_AUTO) {
+ List<String> instanceList = idealState.getPreferenceList(partition);
+ if (instanceList.contains(ins)) {
+ assignmentNotChanged = true;
+ }
+ }
}
}