You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/11/07 02:19:41 UTC

[33/53] [abbrv] [HELIX-18] Added more functionality to the new cluster setup

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e23a3088/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
index 1f44e69..85330be 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
@@ -25,6 +25,7 @@ import org.apache.commons.cli.ParseException;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.api.Cluster;
 import org.apache.helix.api.Participant;
 import org.apache.helix.api.Resource;
 import org.apache.helix.api.RunningInstance;
@@ -52,6 +53,7 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.ConstraintItem;
 import org.apache.helix.model.IdealState;
@@ -60,6 +62,7 @@ import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.util.HelixUtil;
 import org.apache.log4j.Logger;
 
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -288,6 +291,18 @@ public class NewClusterSetup {
 
   }
 
+  void rebalance(String[] optValues, String[] groupTagValues) {
+    String clusterName = optValues[0];
+    String resourceName = optValues[1];
+    int replicaCount = Integer.parseInt(optValues[2]);
+    String groupTag = null;
+    if (groupTagValues != null && groupTagValues.length > 0) {
+      groupTag = groupTagValues[0];
+    }
+    ResourceAccessor accessor = resourceAccessor(clusterName);
+    accessor.generateDefaultAssignment(ResourceId.from(resourceName), replicaCount, groupTag);
+  }
+
   void addInstance(String[] optValues) {
     String clusterName = optValues[0];
     String[] instanceIds = optValues[1].split(";");
@@ -651,6 +666,22 @@ public class NewClusterSetup {
     accessor.updateCluster(delta);
   }
 
+  void getConstraints(String[] optValues) {
+    String clusterName = optValues[0];
+    ConstraintType constraintType = ConstraintType.valueOf(optValues[1]);
+    ClusterAccessor accessor = clusterAccessor(clusterName);
+    ClusterConstraints constraints = accessor.readConstraints(constraintType);
+    System.out.println(constraints.toString());
+  }
+
+  void removeConstraint(String[] optValues) {
+    String clusterName = optValues[0];
+    ConstraintType constraintType = ConstraintType.valueOf(optValues[1]);
+    ConstraintId constraintId = ConstraintId.from(optValues[2]);
+    ClusterAccessor accessor = clusterAccessor(clusterName);
+    accessor.removeConstraint(constraintType, constraintId);
+  }
+
   void listClusterInfo(String[] optValues) {
     String clusterName = optValues[0];
     ClusterAccessor accessor = clusterAccessor(clusterName);
@@ -823,6 +854,106 @@ public class NewClusterSetup {
     userConfig.setMapField(partitionId.stringify(), fields);
   }
 
+  void swapParticipants(String[] optValues) {
+    String clusterName = optValues[0];
+    String oldParticipantName = optValues[1];
+    String newParticipantName = optValues[2];
+    ParticipantAccessor accessor = participantAccessor(clusterName);
+    accessor.swapParticipants(ParticipantId.from(oldParticipantName),
+        ParticipantId.from(newParticipantName));
+  }
+
+  void resetPartition(String[] optValues) {
+    String clusterName = optValues[0];
+    String participantName = optValues[1];
+    String resourceName = optValues[2];
+    String partitionName = optValues[3];
+
+    Set<PartitionId> partitionIds = ImmutableSet.of(PartitionId.from(partitionName));
+    ParticipantAccessor accessor = participantAccessor(clusterName);
+    accessor.resetPartitionsForParticipant(ParticipantId.from(participantName),
+        ResourceId.from(resourceName), partitionIds);
+  }
+
+  void resetResource(String[] optValues) {
+    String clusterName = optValues[0];
+    String resourceName = optValues[1];
+    Set<ResourceId> resourceIds = ImmutableSet.of(ResourceId.from(resourceName));
+    ResourceAccessor accessor = resourceAccessor(clusterName);
+    accessor.resetResources(resourceIds);
+  }
+
+  void resetParticipant(String[] optValues) {
+    String clusterName = optValues[0];
+    String participantName = optValues[1];
+    Set<ParticipantId> participantIds = ImmutableSet.of(ParticipantId.from(participantName));
+    ParticipantAccessor accessor = participantAccessor(clusterName);
+    accessor.resetParticipants(participantIds);
+  }
+
+  void addStat(String[] optValues) {
+    String clusterName = optValues[0];
+    String statName = optValues[1];
+    ClusterAccessor accessor = clusterAccessor(clusterName);
+    accessor.addStat(statName);
+  }
+
+  void dropStat(String[] optValues) {
+    String clusterName = optValues[0];
+    String statName = optValues[1];
+    ClusterAccessor accessor = clusterAccessor(clusterName);
+    accessor.dropStat(statName);
+  }
+
+  void addAlert(String[] optValues) {
+    String clusterName = optValues[0];
+    String alertName = optValues[1];
+    ClusterAccessor accessor = clusterAccessor(clusterName);
+    accessor.addAlert(alertName);
+  }
+
+  void dropAlert(String[] optValues) {
+    String clusterName = optValues[0];
+    String alertName = optValues[1];
+    ClusterAccessor accessor = clusterAccessor(clusterName);
+    accessor.dropAlert(alertName);
+  }
+
+  void expandResource(String[] optValues) {
+    String clusterName = optValues[0];
+    String resourceName = optValues[1];
+    expandResource(ClusterId.from(clusterName), ResourceId.from(resourceName));
+  }
+
+  void expandCluster(String[] optValues) {
+    String clusterName = optValues[0];
+    ClusterAccessor accessor = clusterAccessor(clusterName);
+    Cluster cluster = accessor.readCluster();
+    for (ResourceId resourceId : cluster.getResourceMap().keySet()) {
+      expandResource(ClusterId.from(clusterName), resourceId);
+    }
+  }
+
+  private void expandResource(ClusterId clusterId, ResourceId resourceId) {
+    ResourceAccessor accessor = resourceAccessor(clusterId.stringify());
+    Resource resource = accessor.readResource(resourceId);
+    SemiAutoRebalancerContext context =
+        resource.getRebalancerConfig().getRebalancerContext(SemiAutoRebalancerContext.class);
+    if (context == null) {
+      LOG.info("Only SEMI_AUTO mode supported for resource expansion");
+      return;
+    }
+    if (context.anyLiveParticipant()) {
+      LOG.info("Resource uses ANY_LIVE_PARTICIPANT, skipping default assignment");
+      return;
+    }
+    if (context.getPreferenceLists().size() == 0) {
+      LOG.info("No preference lists have been set yet, skipping default assignment");
+      return;
+    }
+    accessor.generateDefaultAssignment(resourceId, -1, null);
+  }
+
   static int processCommandLineArgs(String[] cliArgs) {
     CommandLineParser cliParser = new GnuParser();
     Options cliOptions = constructCommandLineOptions();
@@ -909,19 +1040,24 @@ public class NewClusterSetup {
           setup.addIdealState(optValues);
           break;
         case swapInstance:
-          // TODO impl ClusterAccessor#swapParticipantsInCluster()
+          setup.swapParticipants(optValues);
           break;
         case dropInstance:
           setup.dropInstance(optValues);
           break;
         case rebalance:
-          // TODO impl this using ResourceAccessor
+          String[] groupTagValues = null;
+          if (cmd.hasOption(HelixOption.instanceGroupTag.name())) {
+            groupTagValues = cmd.getOptionValues(HelixOption.instanceGroupTag.name());
+            checkArgNum(HelixOption.instanceGroupTag, groupTagValues);
+          }
+          setup.rebalance(optValues, groupTagValues);
           break;
         case expandCluster:
-          // TODO impl this
+          setup.expandCluster(optValues);
           break;
         case expandResource:
-          // TODO impl this
+          setup.expandResource(optValues);
           break;
         case mode:
         case rebalancerMode:
@@ -930,9 +1066,11 @@ public class NewClusterSetup {
           // always used with addResource command
           continue;
         case instanceGroupTag:
-        case resourceKeyPrefix:
           // always used with rebalance command
           continue;
+        case resourceKeyPrefix:
+          throw new UnsupportedOperationException(HelixOption.resourceKeyPrefix
+              + " is not supported, please set partition names directly");
         case addResourceProperty:
           throw new UnsupportedOperationException(HelixOption.addResourceProperty
               + " is not supported, please use setConfig");
@@ -973,25 +1111,25 @@ public class NewClusterSetup {
           setup.enableCluster(optValues);
           break;
         case resetPartition:
-          // TODO impl ResoourceAccessor#resetPartitions()
+          setup.resetPartition(optValues);
           break;
         case resetInstance:
-          // TODO impl ParticipantAccessor#resetInstance()
+          setup.resetParticipant(optValues);
           break;
         case resetResource:
-          // TODO impl ResourceAccessor#resetResource()
+          setup.resetResource(optValues);
           break;
         case addStat:
-          // TODO impl ClusterAccessor.addStat()
+          setup.addStat(optValues);
           break;
         case addAlert:
-          // TODO impl ClusterAccessor#addAlert()
+          setup.addAlert(optValues);
           break;
         case dropStat:
-          // TODO impl ClusterAccessor.dropStat()
+          setup.dropStat(optValues);
           break;
         case dropAlert:
-          // TODO impl ClusterAccessor#dropAlert()
+          setup.dropAlert(optValues);
           break;
         case getConfig:
           setup.getConfig(optValues);
@@ -1003,11 +1141,13 @@ public class NewClusterSetup {
           setup.removeConfig(optValues);
           break;
         case getConstraints:
+          setup.getConstraints(optValues);
           break;
         case setConstraint:
           setup.setConstraint(optValues);
           break;
         case removeConstraint:
+          setup.removeConstraint(optValues);
           break;
         default:
           System.err.println("Non-recognized option: " + opt);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e23a3088/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
index 652ac73..8de39ab 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
@@ -35,31 +35,20 @@ import java.util.TreeSet;
 
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.api.HelixVersion;
-import org.apache.helix.api.Participant;
-import org.apache.helix.api.RunningInstance;
-import org.apache.helix.api.Scope;
 import org.apache.helix.api.State;
 import org.apache.helix.api.config.ClusterConfig;
-import org.apache.helix.api.config.UserConfig;
 import org.apache.helix.api.id.ClusterId;
-import org.apache.helix.api.id.MessageId;
 import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ProcId;
 import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.SessionId;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
 import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
-import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.IdealState;
-import org.apache.helix.model.Message;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 public class TestAutoRebalanceStrategy {
   private static Logger logger = Logger.getLogger(TestAutoRebalanceStrategy.class);
@@ -229,30 +218,22 @@ public class TestAutoRebalanceStrategy {
         ClusterConfig cluster =
             new ClusterConfig.Builder(ClusterId.from("cluster")).addStateModelDefinition(
                 _stateModelDef).build();
-        Map<ParticipantId, Participant> liveParticipantMap = Maps.newHashMap();
+        Set<ParticipantId> liveParticipantSet = Sets.newHashSet();
         for (String node : _liveNodes) {
-          Set<String> tags = Collections.emptySet();
-          Map<MessageId, Message> messageMap = Collections.emptyMap();
-          Set<PartitionId> disabledPartitionIdSet = Collections.emptySet();
-          Map<ResourceId, CurrentState> currentStateMap = Maps.newHashMap();
-          RunningInstance runningInstance =
-              new RunningInstance(SessionId.from("testSession"), HelixVersion.from("1.2.3.4"),
-                  ProcId.from("1234"));
-          Participant participant =
-              new Participant(ParticipantId.from(node), node, 0, true, disabledPartitionIdSet,
-                  tags, runningInstance, currentStateMap, messageMap, new UserConfig(
-                      Scope.participant(ParticipantId.from(node))));
-          liveParticipantMap.put(participant.getId(), participant);
+          liveParticipantSet.add(ParticipantId.from(node));
         }
         List<ParticipantId> preferenceList =
             IdealState.preferenceListFromStringList(listResult.get(partition));
         Set<ParticipantId> disabledParticipantsForPartition = Collections.emptySet();
         Map<ParticipantId, State> currentStateMap =
             IdealState.participantStateMapFromStringMap(rawCurStateMap);
+        Map<State, String> upperBounds =
+            NewConstraintBasedAssignment.stateConstraints(_stateModelDef,
+                ResourceId.from(RESOURCE_NAME), cluster);
         Map<ParticipantId, State> assignment =
-            NewConstraintBasedAssignment.computeAutoBestStateForPartition(cluster,
-                ResourceId.from(RESOURCE_NAME), liveParticipantMap, _stateModelDef, preferenceList,
-                currentStateMap, disabledParticipantsForPartition);
+            NewConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds,
+                liveParticipantSet, _stateModelDef, preferenceList, currentStateMap,
+                disabledParticipantsForPartition);
         mapResult.put(partition, IdealState.stringMapFromParticipantStateMap(assignment));
       }
       return mapResult;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e23a3088/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
index e07e7d7..e4943f8 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
@@ -259,10 +259,13 @@ public class TestNewAutoRebalanceStrategy {
         // compute the mapping
         Map<ParticipantId, State> replicaMap =
             ResourceAssignment.replicaMapFromStringMap(_currentMapping.get(partition));
+        Map<State, String> upperBounds =
+            NewConstraintBasedAssignment.stateConstraints(_stateModelDef,
+                ResourceId.from(RESOURCE_NAME), clusterConfig);
         Map<ParticipantId, State> assignment =
-            NewConstraintBasedAssignment.computeAutoBestStateForPartition(clusterConfig,
-                ResourceId.from(RESOURCE_NAME), liveParticipantMap, _stateModelDef,
-                participantPreferenceList, replicaMap, disabledParticipantsForPartition);
+            NewConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds,
+                liveParticipantMap.keySet(), _stateModelDef, participantPreferenceList, replicaMap,
+                disabledParticipantsForPartition);
         mapResult.put(partitionId, assignment);
       }