You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2017/02/08 18:00:01 UTC

[26/38] helix git commit: Persist assignment map using specific format for MasterSlave resources (This is a short-term solution, we should get rid of this asap).

Persist assignment map using specific format for MasterSlave resources (This is a short-term solution, we should get rid of this asap).


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/8c58cf34
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/8c58cf34
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/8c58cf34

Branch: refs/heads/helix-0.6.x
Commit: 8c58cf34df1c72b58609f75b78d0b14a3b219fc3
Parents: 092b73a
Author: Lei Xia <lx...@linkedin.com>
Authored: Fri Sep 9 13:31:14 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Wed Feb 8 09:53:22 2017 -0800

----------------------------------------------------------------------
 .../stages/PersistAssignmentStage.java          | 70 +++++++++++++--
 .../TestRebalancerPersistAssignments.java       | 95 ++++++++++++++------
 2 files changed, 131 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/8c58cf34/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
index ea49234..3c6c1ce 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
@@ -19,16 +19,19 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
-import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
-import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.MasterSlaveSMD;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.log4j.Logger;
@@ -50,15 +53,16 @@ public class PersistAssignmentStage extends AbstractBaseStage {
       HelixManager helixManager = event.getAttribute("helixmanager");
       HelixDataAccessor accessor = helixManager.getHelixDataAccessor();
       PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-      BestPossibleStateOutput assignments =
+      BestPossibleStateOutput bestPossibleAssignments =
           event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
       Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
 
-      for (String resourceId : assignments.resourceSet()) {
+      for (String resourceId : bestPossibleAssignments.resourceSet()) {
         Resource resource = resourceMap.get(resourceId);
         if (resource != null) {
           boolean changed = false;
-          Map<Partition, Map<String, String>> assignment = assignments.getResourceMap(resourceId);
+          Map<Partition, Map<String, String>> bestPossibleAssignment =
+              bestPossibleAssignments.getResourceMap(resourceId);
           IdealState idealState = cache.getIdealState(resourceId);
           if (idealState == null) {
             LOG.warn("IdealState not found for resource " + resourceId);
@@ -70,8 +74,13 @@ public class PersistAssignmentStage extends AbstractBaseStage {
             // do not persist assignment for resource in neither semi or full auto.
             continue;
           }
+
+          //TODO: temporary solution for Espresso/Dbus backcompatible, should remove this.
+          Map<Partition, Map<String, String>> assignmentToPersist =
+              convertAssignmentPersisted(resource, idealState, bestPossibleAssignment);
+
           for (Partition partition : resource.getPartitions()) {
-            Map<String, String> instanceMap = assignment.get(partition);
+            Map<String, String> instanceMap = assignmentToPersist.get(partition);
             Map<String, String> existInstanceMap =
                 idealState.getInstanceStateMap(partition.getPartitionName());
             if (instanceMap == null && existInstanceMap == null) {
@@ -84,8 +93,8 @@ public class PersistAssignmentStage extends AbstractBaseStage {
             }
           }
           if (changed) {
-            for (Partition partition : assignment.keySet()) {
-              Map<String, String> instanceMap = assignment.get(partition);
+            for (Partition partition : assignmentToPersist.keySet()) {
+              Map<String, String> instanceMap = assignmentToPersist.get(partition);
               idealState.setInstanceStateMap(partition.getPartitionName(), instanceMap);
             }
             accessor.setProperty(keyBuilder.idealStates(resourceId), idealState);
@@ -97,4 +106,49 @@ public class PersistAssignmentStage extends AbstractBaseStage {
     long endTime = System.currentTimeMillis();
     LOG.info("END PersistAssignmentStage.process(), took " + (endTime - startTime) + " ms");
   }
+
+  /**
+   * TODO: This is a temporary hacky for back-compatible support of Espresso and Databus,
+   * we should get rid of this conversion as soon as possible.
+   * --- Lei, 2016/9/9.
+   */
+  private Map<Partition, Map<String, String>> convertAssignmentPersisted(Resource resource,
+      IdealState idealState, Map<Partition, Map<String, String>> bestPossibleAssignment) {
+    String stateModelDef = idealState.getStateModelDefRef();
+    /** Only convert for MasterSlave resources */
+    if (!stateModelDef.equals(BuiltInStateModelDefinitions.MasterSlave.name())) {
+      return bestPossibleAssignment;
+    }
+
+    Map<Partition, Map<String, String>> assignmentToPersist =
+        new HashMap<Partition, Map<String, String>>();
+
+    for (Partition partition : resource.getPartitions()) {
+      Map<String, String> instanceMap = new HashMap<String, String>();
+      instanceMap.putAll(bestPossibleAssignment.get(partition));
+
+      List<String> preferenceList = idealState.getPreferenceList(partition.getPartitionName());
+      boolean hasMaster = false;
+      for (String ins : preferenceList) {
+        String state = instanceMap.get(ins);
+        if (state == null || (!state.equals(MasterSlaveSMD.States.SLAVE.name()) && !state
+            .equals(MasterSlaveSMD.States.MASTER.name()))) {
+          instanceMap.put(ins, MasterSlaveSMD.States.SLAVE.name());
+        }
+
+        if (state != null && state.equals(MasterSlaveSMD.States.MASTER.name())) {
+          hasMaster = true;
+        }
+      }
+
+      // if no master, just pick the first node in the preference list as the master.
+      if (!hasMaster) {
+        instanceMap.put(preferenceList.get(0), MasterSlaveSMD.States.MASTER.name());
+      }
+
+      assignmentToPersist.put(partition, instanceMap);
+    }
+
+    return assignmentToPersist;
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/8c58cf34/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java b/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java
index fd0cc64..382ef1b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java
@@ -27,6 +27,7 @@ import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.MasterSlaveSMD;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossibleExternalViewVerifier;
@@ -40,6 +41,7 @@ import org.testng.annotations.Test;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -78,9 +80,7 @@ public class TestRebalancerPersistAssignments extends ZkStandAloneCMTestBase {
 
   @DataProvider(name = "rebalanceModes")
   public static RebalanceMode [][] rebalanceModes() {
-    return new RebalanceMode[][] { {RebalanceMode.FULL_AUTO},
-        {RebalanceMode.SEMI_AUTO}
-    };
+    return new RebalanceMode[][] { {RebalanceMode.SEMI_AUTO}, {RebalanceMode.FULL_AUTO}};
   }
 
   @Test(dataProvider = "rebalanceModes")
@@ -95,6 +95,7 @@ public class TestRebalancerPersistAssignments extends ZkStandAloneCMTestBase {
     HelixClusterVerifier verifier =
         new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
             .setResources(new HashSet<String>(Collections.singleton(testDb))).build();
+    Thread.sleep(500);
     Assert.assertTrue(verifier.verify());
 
     // kill 1 node
@@ -107,6 +108,7 @@ public class TestRebalancerPersistAssignments extends ZkStandAloneCMTestBase {
 
     Set<String> excludedInstances = new HashSet<String>();
     excludedInstances.add(_participants[0].getInstanceName());
+    Thread.sleep(2000);
     verifyAssignmentInIdealStateWithPersistDisabled(idealState, excludedInstances);
 
     // clean up
@@ -122,13 +124,14 @@ public class TestRebalancerPersistAssignments extends ZkStandAloneCMTestBase {
     String testDb = "TestDB1-" + rebalanceMode.name();
     enablePersistAssignment(true);
 
-    _setupTool.addResourceToCluster(CLUSTER_NAME, testDb, 30,
-        BuiltInStateModelDefinitions.MasterSlave.name(), rebalanceMode.name());
+    _setupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5,
+        BuiltInStateModelDefinitions.LeaderStandby.name(), rebalanceMode.name());
     _setupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3);
 
-    boolean result = ClusterStateVerifier.verifyByZkCallback(
-        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
-    Assert.assertTrue(result);
+    HelixClusterVerifier verifier =
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+            .setResources(new HashSet<String>(Collections.singleton(testDb))).build();
+    Assert.assertTrue(verifier.verify());
 
     IdealState idealState =
         _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb);
@@ -137,7 +140,7 @@ public class TestRebalancerPersistAssignments extends ZkStandAloneCMTestBase {
     // kill 1 node
     _participants[0].syncStop();
 
-    result = ClusterStateVerifier.verifyByZkCallback(
+    Boolean result = ClusterStateVerifier.verifyByZkCallback(
         new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
     Assert.assertTrue(result);
 
@@ -148,36 +151,76 @@ public class TestRebalancerPersistAssignments extends ZkStandAloneCMTestBase {
     excludedInstances.add(_participants[0].getInstanceName());
     verifyAssignmentInIdealStateWithPersistEnabled(idealState, excludedInstances);
 
+    // clean up
     _setupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb);
+    _participants[0] =
+        new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, _participants[0].getInstanceName());
+    _participants[0].syncStart();
   }
 
-  @Test(dataProvider = "rebalanceModes")
-  public void testAutoRebalanceWithPersistAssignmentDisabled(RebalanceMode rebalanceMode)
-      throws Exception {
-    String testDb = "TestDB2-" + rebalanceMode.name();
-    _setupTool.addResourceToCluster(CLUSTER_NAME, testDb, 30,
-        BuiltInStateModelDefinitions.MasterSlave.name(), rebalanceMode.name());
+  /**
+   * This test is to test the temporary solution for solving Espresso/Databus back-compatible map format issue.
+   *
+   * @throws Exception
+   */
+  @Test(dependsOnMethods = { "testDisablePersist" })
+  public void testSemiAutoEnablePersistMasterSlave() throws Exception {
+    String testDb = "TestDB1-MasterSlave";
+    enablePersistAssignment(true);
+
+    _setupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5,
+        BuiltInStateModelDefinitions.MasterSlave.name(), RebalanceMode.SEMI_AUTO.name());
     _setupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3);
 
-    boolean result = ClusterStateVerifier.verifyByZkCallback(
-        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
-    Assert.assertTrue(result);
+    HelixClusterVerifier verifier =
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+            .setResources(new HashSet<String>(Collections.singleton(testDb))).build();
+    Assert.assertTrue(verifier.verify());
+
+    IdealState idealState =
+        _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb);
+    verifySemiAutoMasterSlaveAssignment(idealState);
 
     // kill 1 node
     _participants[0].syncStop();
 
-    result = ClusterStateVerifier.verifyByZkCallback(
-        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
-    Assert.assertTrue(result);
+    Assert.assertTrue(verifier.verify());
 
-    IdealState idealState =
-        _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb);
+    idealState = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb);
+    verifySemiAutoMasterSlaveAssignment(idealState);
 
-    Set<String> excludedInstances = new HashSet<String>();
-    excludedInstances.add(_participants[0].getInstanceName());
-    verifyAssignmentInIdealStateWithPersistDisabled(idealState, excludedInstances);
+    // disable an instance
+    _setupTool.getClusterManagementTool()
+        .enableInstance(CLUSTER_NAME, _participants[1].getInstanceName(), false);
+    idealState = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb);
+    verifySemiAutoMasterSlaveAssignment(idealState);
 
+    // clean up
     _setupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb);
+    _setupTool.getClusterManagementTool()
+        .enableInstance(CLUSTER_NAME, _participants[1].getInstanceName(), true);
+    _participants[0].reset();
+    _participants[0].syncStart();
+  }
+
+  private void verifySemiAutoMasterSlaveAssignment(IdealState idealState) {
+    for (String partition : idealState.getPartitionSet()) {
+      Map<String, String> instanceStateMap = idealState.getInstanceStateMap(partition);
+      List<String> preferenceList = idealState.getPreferenceList(partition);
+      int numMaster = 0;
+
+      for (String ins : preferenceList) {
+        Assert.assertTrue(instanceStateMap.containsKey(ins));
+        String state = instanceStateMap.get(ins);
+        Assert.assertTrue(state.equals(MasterSlaveSMD.States.MASTER.name()) || state
+            .equals(MasterSlaveSMD.States.SLAVE.name()));
+        if (state.equals(MasterSlaveSMD.States.MASTER.name())) {
+          numMaster++;
+        }
+      }
+
+      Assert.assertEquals(numMaster, 1);
+    }
   }
 
   private void enablePersistAssignment(Boolean enable) {