You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/10/30 02:37:20 UTC
git commit: [HELIX-280] Full auto rebalancer should check resource
tag first, rb=14931
Updated Branches:
refs/heads/helix-0.6.2-release 4383512d0 -> 962701dfd
[HELIX-280] Full auto rebalancer should check resource tag first, rb=14931
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/962701df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/962701df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/962701df
Branch: refs/heads/helix-0.6.2-release
Commit: 962701dfd24cc0785458cbe3d0ff2089fe79ab0f
Parents: 4383512
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Tue Oct 29 18:29:32 2013 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Tue Oct 29 18:35:32 2013 -0700
----------------------------------------------------------------------
.../controller/rebalancer/AutoRebalancer.java | 20 ++-
.../integration/TestFullAutoNodeTagging.java | 124 +++++++++++++++++++
2 files changed, 139 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/962701df/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
index 7017de3..5a832ce 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
@@ -101,11 +101,21 @@ public class AutoRebalancer implements Rebalancer, MappingCalculator {
}
}
}
- }
- if (taggedNodes.size() > 0) {
- if (LOG.isInfoEnabled()) {
- LOG.info("found the following instances with tag " + currentIdealState.getResourceName()
- + " " + taggedLiveNodes);
+ if (!taggedLiveNodes.isEmpty()) {
+ // live nodes exist that have this tag
+ if (LOG.isInfoEnabled()) {
+ LOG.info("found the following participants with tag "
+ + currentIdealState.getInstanceGroupTag() + " for " + resourceName + ": "
+ + taggedLiveNodes);
+ }
+ } else if (taggedNodes.isEmpty()) {
+ // no live nodes and no configured nodes have this tag
+ LOG.warn("Resource " + resourceName + " has tag " + currentIdealState.getInstanceGroupTag()
+ + " but no configured participants have this tag");
+ } else {
+ // configured nodes have this tag, but no live nodes have this tag
+ LOG.warn("Resource " + resourceName + " has tag " + currentIdealState.getInstanceGroupTag()
+ + " but no live participants have this tag");
}
allNodes = new ArrayList<String>(taggedNodes);
liveNodes = new ArrayList<String>(taggedLiveNodes);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/962701df/helix-core/src/test/java/org/apache/helix/integration/TestFullAutoNodeTagging.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestFullAutoNodeTagging.java b/helix-core/src/test/java/org/apache/helix/integration/TestFullAutoNodeTagging.java
index d815c80..e0c8b6f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestFullAutoNodeTagging.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestFullAutoNodeTagging.java
@@ -20,6 +20,7 @@ package org.apache.helix.integration;
*/
import java.util.Date;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -36,6 +37,7 @@ import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
@@ -58,6 +60,65 @@ public class TestFullAutoNodeTagging extends ZkUnitTestBase {
private static final Logger LOG = Logger.getLogger(TestFullAutoNodeTagging.class);
/**
+ * Ensure that no assignments happen when there are no tagged nodes, but the resource is tagged
+ */
+ @Test
+ public void testResourceTaggedFirst() throws Exception {
+ final int NUM_PARTICIPANTS = 10;
+ final int NUM_PARTITIONS = 4;
+ final int NUM_REPLICAS = 2;
+ final String RESOURCE_NAME = "TestDB0";
+ final String TAG = "ASSIGNABLE";
+
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ // Set up cluster
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ NUM_PARTITIONS, // partitions per resource
+ NUM_PARTICIPANTS, // number of nodes
+ NUM_REPLICAS, // replicas
+ "MasterSlave", RebalanceMode.FULL_AUTO, // use FULL_AUTO mode to test node tagging
+ true); // do rebalance
+
+ // tag the resource
+ HelixAdmin helixAdmin = new ZKHelixAdmin(ZK_ADDR);
+ IdealState idealState = helixAdmin.getResourceIdealState(clusterName, RESOURCE_NAME);
+ idealState.setInstanceGroupTag(TAG);
+ helixAdmin.setResourceIdealState(clusterName, RESOURCE_NAME, idealState);
+
+ // start controller
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+ controller.syncStart();
+
+ // start participants
+ MockParticipantManager[] participants = new MockParticipantManager[NUM_PARTICIPANTS];
+ for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+ final String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].syncStart();
+ }
+
+ Thread.sleep(1000);
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new EmptyZkVerifier(clusterName, RESOURCE_NAME));
+ Assert.assertTrue(result, "External view and current state must be empty");
+
+ // cleanup
+ for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+ participants[i].syncStop();
+ }
+ controller.syncStop();
+ }
+
+ /**
* Basic test for tagging behavior. 10 participants, of which 4 are tagged. Launch all 10,
* checking external view every time a tagged node is started. Then shut down all 10, checking
* external view every time a tagged node is killed.
@@ -311,4 +372,67 @@ public class TestFullAutoNodeTagging extends ZkUnitTestBase {
return _clusterName;
}
}
+
+ /**
+ * Ensures that external view and current state are empty
+ */
+ private static class EmptyZkVerifier implements ZkVerifier {
+ private final String _clusterName;
+ private final String _resourceName;
+ private final ZkClient _zkClient;
+
+ /**
+ * Instantiate the verifier
+ * @param clusterName the cluster to verify
+ * @param resourceName the resource to verify
+ */
+ public EmptyZkVerifier(String clusterName, String resourceName) {
+ _clusterName = clusterName;
+ _resourceName = resourceName;
+ _zkClient = ZKClientPool.getZkClient(ZK_ADDR);
+ }
+
+ @Override
+ public boolean verify() {
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(_clusterName, baseAccessor);
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ ExternalView externalView = accessor.getProperty(keyBuilder.externalView(_resourceName));
+
+ // verify external view empty
+ for (String partition : externalView.getPartitionSet()) {
+ Map<String, String> stateMap = externalView.getStateMap(partition);
+ if (stateMap != null && !stateMap.isEmpty()) {
+ LOG.error("External view not empty for " + partition);
+ return false;
+ }
+ }
+
+ // verify current state empty
+ List<String> liveParticipants = accessor.getChildNames(keyBuilder.liveInstances());
+ for (String participant : liveParticipants) {
+ List<String> sessionIds = accessor.getChildNames(keyBuilder.sessions(participant));
+ for (String sessionId : sessionIds) {
+ CurrentState currentState =
+ accessor.getProperty(keyBuilder.currentState(participant, sessionId, _resourceName));
+ Map<String, String> partitionStateMap = currentState.getPartitionStateMap();
+ if (partitionStateMap != null && !partitionStateMap.isEmpty()) {
+ LOG.error("Current state not empty for " + participant);
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public ZkClient getZkClient() {
+ return _zkClient;
+ }
+
+ @Override
+ public String getClusterName() {
+ return _clusterName;
+ }
+ }
}