You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2017/02/08 18:00:01 UTC
[26/38] helix git commit: Persist assignment map using specific
format for MasterSlave resources (This is a short-term solution,
we should get rid of this asap).
Persist assignment map using specific format for MasterSlave resources (This is a short-term solution, we should get rid of this asap).
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/8c58cf34
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/8c58cf34
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/8c58cf34
Branch: refs/heads/helix-0.6.x
Commit: 8c58cf34df1c72b58609f75b78d0b14a3b219fc3
Parents: 092b73a
Author: Lei Xia <lx...@linkedin.com>
Authored: Fri Sep 9 13:31:14 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Wed Feb 8 09:53:22 2017 -0800
----------------------------------------------------------------------
.../stages/PersistAssignmentStage.java | 70 +++++++++++++--
.../TestRebalancerPersistAssignments.java | 95 ++++++++++++++------
2 files changed, 131 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/8c58cf34/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
index ea49234..3c6c1ce 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
@@ -19,16 +19,19 @@ package org.apache.helix.controller.stages;
* under the License.
*/
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
-import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
-import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
+import org.apache.helix.model.MasterSlaveSMD;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.log4j.Logger;
@@ -50,15 +53,16 @@ public class PersistAssignmentStage extends AbstractBaseStage {
HelixManager helixManager = event.getAttribute("helixmanager");
HelixDataAccessor accessor = helixManager.getHelixDataAccessor();
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
- BestPossibleStateOutput assignments =
+ BestPossibleStateOutput bestPossibleAssignments =
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
- for (String resourceId : assignments.resourceSet()) {
+ for (String resourceId : bestPossibleAssignments.resourceSet()) {
Resource resource = resourceMap.get(resourceId);
if (resource != null) {
boolean changed = false;
- Map<Partition, Map<String, String>> assignment = assignments.getResourceMap(resourceId);
+ Map<Partition, Map<String, String>> bestPossibleAssignment =
+ bestPossibleAssignments.getResourceMap(resourceId);
IdealState idealState = cache.getIdealState(resourceId);
if (idealState == null) {
LOG.warn("IdealState not found for resource " + resourceId);
@@ -70,8 +74,13 @@ public class PersistAssignmentStage extends AbstractBaseStage {
// do not persist assignment for resource in neither semi or full auto.
continue;
}
+
+ //TODO: temporary solution for Espresso/Dbus backcompatible, should remove this.
+ Map<Partition, Map<String, String>> assignmentToPersist =
+ convertAssignmentPersisted(resource, idealState, bestPossibleAssignment);
+
for (Partition partition : resource.getPartitions()) {
- Map<String, String> instanceMap = assignment.get(partition);
+ Map<String, String> instanceMap = assignmentToPersist.get(partition);
Map<String, String> existInstanceMap =
idealState.getInstanceStateMap(partition.getPartitionName());
if (instanceMap == null && existInstanceMap == null) {
@@ -84,8 +93,8 @@ public class PersistAssignmentStage extends AbstractBaseStage {
}
}
if (changed) {
- for (Partition partition : assignment.keySet()) {
- Map<String, String> instanceMap = assignment.get(partition);
+ for (Partition partition : assignmentToPersist.keySet()) {
+ Map<String, String> instanceMap = assignmentToPersist.get(partition);
idealState.setInstanceStateMap(partition.getPartitionName(), instanceMap);
}
accessor.setProperty(keyBuilder.idealStates(resourceId), idealState);
@@ -97,4 +106,49 @@ public class PersistAssignmentStage extends AbstractBaseStage {
long endTime = System.currentTimeMillis();
LOG.info("END PersistAssignmentStage.process(), took " + (endTime - startTime) + " ms");
}
+
+ /**
+ * TODO: This is a temporary hacky for back-compatible support of Espresso and Databus,
+ * we should get rid of this conversion as soon as possible.
+ * --- Lei, 2016/9/9.
+ */
+ private Map<Partition, Map<String, String>> convertAssignmentPersisted(Resource resource,
+ IdealState idealState, Map<Partition, Map<String, String>> bestPossibleAssignment) {
+ String stateModelDef = idealState.getStateModelDefRef();
+ /** Only convert for MasterSlave resources */
+ if (!stateModelDef.equals(BuiltInStateModelDefinitions.MasterSlave.name())) {
+ return bestPossibleAssignment;
+ }
+
+ Map<Partition, Map<String, String>> assignmentToPersist =
+ new HashMap<Partition, Map<String, String>>();
+
+ for (Partition partition : resource.getPartitions()) {
+ Map<String, String> instanceMap = new HashMap<String, String>();
+ instanceMap.putAll(bestPossibleAssignment.get(partition));
+
+ List<String> preferenceList = idealState.getPreferenceList(partition.getPartitionName());
+ boolean hasMaster = false;
+ for (String ins : preferenceList) {
+ String state = instanceMap.get(ins);
+ if (state == null || (!state.equals(MasterSlaveSMD.States.SLAVE.name()) && !state
+ .equals(MasterSlaveSMD.States.MASTER.name()))) {
+ instanceMap.put(ins, MasterSlaveSMD.States.SLAVE.name());
+ }
+
+ if (state != null && state.equals(MasterSlaveSMD.States.MASTER.name())) {
+ hasMaster = true;
+ }
+ }
+
+ // if no master, just pick the first node in the preference list as the master.
+ if (!hasMaster) {
+ instanceMap.put(preferenceList.get(0), MasterSlaveSMD.States.MASTER.name());
+ }
+
+ assignmentToPersist.put(partition, instanceMap);
+ }
+
+ return assignmentToPersist;
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/8c58cf34/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java b/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java
index fd0cc64..382ef1b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java
@@ -27,6 +27,7 @@ import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.MasterSlaveSMD;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier.BestPossibleExternalViewVerifier;
@@ -40,6 +41,7 @@ import org.testng.annotations.Test;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -78,9 +80,7 @@ public class TestRebalancerPersistAssignments extends ZkStandAloneCMTestBase {
@DataProvider(name = "rebalanceModes")
public static RebalanceMode [][] rebalanceModes() {
- return new RebalanceMode[][] { {RebalanceMode.FULL_AUTO},
- {RebalanceMode.SEMI_AUTO}
- };
+ return new RebalanceMode[][] { {RebalanceMode.SEMI_AUTO}, {RebalanceMode.FULL_AUTO}};
}
@Test(dataProvider = "rebalanceModes")
@@ -95,6 +95,7 @@ public class TestRebalancerPersistAssignments extends ZkStandAloneCMTestBase {
HelixClusterVerifier verifier =
new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
.setResources(new HashSet<String>(Collections.singleton(testDb))).build();
+ Thread.sleep(500);
Assert.assertTrue(verifier.verify());
// kill 1 node
@@ -107,6 +108,7 @@ public class TestRebalancerPersistAssignments extends ZkStandAloneCMTestBase {
Set<String> excludedInstances = new HashSet<String>();
excludedInstances.add(_participants[0].getInstanceName());
+ Thread.sleep(2000);
verifyAssignmentInIdealStateWithPersistDisabled(idealState, excludedInstances);
// clean up
@@ -122,13 +124,14 @@ public class TestRebalancerPersistAssignments extends ZkStandAloneCMTestBase {
String testDb = "TestDB1-" + rebalanceMode.name();
enablePersistAssignment(true);
- _setupTool.addResourceToCluster(CLUSTER_NAME, testDb, 30,
- BuiltInStateModelDefinitions.MasterSlave.name(), rebalanceMode.name());
+ _setupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5,
+ BuiltInStateModelDefinitions.LeaderStandby.name(), rebalanceMode.name());
_setupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3);
- boolean result = ClusterStateVerifier.verifyByZkCallback(
- new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
- Assert.assertTrue(result);
+ HelixClusterVerifier verifier =
+ new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+ .setResources(new HashSet<String>(Collections.singleton(testDb))).build();
+ Assert.assertTrue(verifier.verify());
IdealState idealState =
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb);
@@ -137,7 +140,7 @@ public class TestRebalancerPersistAssignments extends ZkStandAloneCMTestBase {
// kill 1 node
_participants[0].syncStop();
- result = ClusterStateVerifier.verifyByZkCallback(
+ Boolean result = ClusterStateVerifier.verifyByZkCallback(
new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
Assert.assertTrue(result);
@@ -148,36 +151,76 @@ public class TestRebalancerPersistAssignments extends ZkStandAloneCMTestBase {
excludedInstances.add(_participants[0].getInstanceName());
verifyAssignmentInIdealStateWithPersistEnabled(idealState, excludedInstances);
+ // clean up
_setupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb);
+ _participants[0] =
+ new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, _participants[0].getInstanceName());
+ _participants[0].syncStart();
}
- @Test(dataProvider = "rebalanceModes")
- public void testAutoRebalanceWithPersistAssignmentDisabled(RebalanceMode rebalanceMode)
- throws Exception {
- String testDb = "TestDB2-" + rebalanceMode.name();
- _setupTool.addResourceToCluster(CLUSTER_NAME, testDb, 30,
- BuiltInStateModelDefinitions.MasterSlave.name(), rebalanceMode.name());
+ /**
+ * This test is to test the temporary solution for solving Espresso/Databus back-compatible map format issue.
+ *
+ * @throws Exception
+ */
+ @Test(dependsOnMethods = { "testDisablePersist" })
+ public void testSemiAutoEnablePersistMasterSlave() throws Exception {
+ String testDb = "TestDB1-MasterSlave";
+ enablePersistAssignment(true);
+
+ _setupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5,
+ BuiltInStateModelDefinitions.MasterSlave.name(), RebalanceMode.SEMI_AUTO.name());
_setupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3);
- boolean result = ClusterStateVerifier.verifyByZkCallback(
- new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
- Assert.assertTrue(result);
+ HelixClusterVerifier verifier =
+ new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+ .setResources(new HashSet<String>(Collections.singleton(testDb))).build();
+ Assert.assertTrue(verifier.verify());
+
+ IdealState idealState =
+ _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb);
+ verifySemiAutoMasterSlaveAssignment(idealState);
// kill 1 node
_participants[0].syncStop();
- result = ClusterStateVerifier.verifyByZkCallback(
- new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
- Assert.assertTrue(result);
+ Assert.assertTrue(verifier.verify());
- IdealState idealState =
- _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb);
+ idealState = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb);
+ verifySemiAutoMasterSlaveAssignment(idealState);
- Set<String> excludedInstances = new HashSet<String>();
- excludedInstances.add(_participants[0].getInstanceName());
- verifyAssignmentInIdealStateWithPersistDisabled(idealState, excludedInstances);
+ // disable an instance
+ _setupTool.getClusterManagementTool()
+ .enableInstance(CLUSTER_NAME, _participants[1].getInstanceName(), false);
+ idealState = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb);
+ verifySemiAutoMasterSlaveAssignment(idealState);
+ // clean up
_setupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb);
+ _setupTool.getClusterManagementTool()
+ .enableInstance(CLUSTER_NAME, _participants[1].getInstanceName(), true);
+ _participants[0].reset();
+ _participants[0].syncStart();
+ }
+
+ private void verifySemiAutoMasterSlaveAssignment(IdealState idealState) {
+ for (String partition : idealState.getPartitionSet()) {
+ Map<String, String> instanceStateMap = idealState.getInstanceStateMap(partition);
+ List<String> preferenceList = idealState.getPreferenceList(partition);
+ int numMaster = 0;
+
+ for (String ins : preferenceList) {
+ Assert.assertTrue(instanceStateMap.containsKey(ins));
+ String state = instanceStateMap.get(ins);
+ Assert.assertTrue(state.equals(MasterSlaveSMD.States.MASTER.name()) || state
+ .equals(MasterSlaveSMD.States.SLAVE.name()));
+ if (state.equals(MasterSlaveSMD.States.MASTER.name())) {
+ numMaster++;
+ }
+ }
+
+ Assert.assertEquals(numMaster, 1);
+ }
}
private void enablePersistAssignment(Boolean enable) {