You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/10/23 02:22:21 UTC

git commit: [HELIX-274] Verify FULL_AUTO tagged node behavior, fix bug in AutoRebalancer, rb=14858

Updated Branches:
  refs/heads/master 422434240 -> f6e4c87e2


[HELIX-274] Verify FULL_AUTO tagged node behavior, fix bug in AutoRebalancer, rb=14858


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/f6e4c87e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/f6e4c87e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/f6e4c87e

Branch: refs/heads/master
Commit: f6e4c87e2cb146282ebf83003e784012ffc08ce5
Parents: 4224342
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Tue Oct 22 14:17:03 2013 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Tue Oct 22 17:21:45 2013 -0700

----------------------------------------------------------------------
 .../controller/rebalancer/AutoRebalancer.java   |  13 +-
 .../integration/TestFullAutoNodeTagging.java    | 314 +++++++++++++++++++
 2 files changed, 323 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f6e4c87e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
index 9564e35..3eb258b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
@@ -82,28 +82,33 @@ public class AutoRebalancer implements Rebalancer {
         ConstraintBasedAssignment.stateCount(stateModelDef, liveInstance.size(),
             Integer.parseInt(replicas));
     List<String> liveNodes = new ArrayList<String>(liveInstance.keySet());
+    List<String> allNodes = new ArrayList<String>(clusterData.getInstanceConfigMap().keySet());
     Map<String, Map<String, String>> currentMapping =
         currentMapping(currentStateOutput, resource.getResourceName(), partitions, stateCountMap);
 
     // If there are nodes tagged with resource name, use only those nodes
     Set<String> taggedNodes = new HashSet<String>();
+    Set<String> taggedLiveNodes = new HashSet<String>();
     if (currentIdealState.getInstanceGroupTag() != null) {
-      for (String instanceName : liveNodes) {
+      for (String instanceName : allNodes) {
         if (clusterData.getInstanceConfigMap().get(instanceName)
             .containsTag(currentIdealState.getInstanceGroupTag())) {
           taggedNodes.add(instanceName);
+          if (liveInstance.containsKey(instanceName)) {
+            taggedLiveNodes.add(instanceName);
+          }
         }
       }
     }
     if (taggedNodes.size() > 0) {
       if (LOG.isInfoEnabled()) {
         LOG.info("found the following instances with tag " + currentIdealState.getResourceName()
-            + " " + taggedNodes);
+            + " " + taggedLiveNodes);
       }
-      liveNodes = new ArrayList<String>(taggedNodes);
+      allNodes = new ArrayList<String>(taggedNodes);
+      liveNodes = new ArrayList<String>(taggedLiveNodes);
     }
 
-    List<String> allNodes = new ArrayList<String>(clusterData.getInstanceConfigMap().keySet());
     int maxPartition = currentIdealState.getMaxPartitionsPerInstance();
 
     if (LOG.isInfoEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f6e4c87e/helix-core/src/test/java/org/apache/helix/integration/TestFullAutoNodeTagging.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestFullAutoNodeTagging.java b/helix-core/src/test/java/org/apache/helix/integration/TestFullAutoNodeTagging.java
new file mode 100644
index 0000000..d815c80
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestFullAutoNodeTagging.java
@@ -0,0 +1,314 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.ZkVerifier;
+import org.apache.helix.util.ZKClientPool;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Test that node tagging behaves correctly in FULL_AUTO mode
+ */
+public class TestFullAutoNodeTagging extends ZkUnitTestBase {
+  private static final Logger LOG = Logger.getLogger(TestFullAutoNodeTagging.class);
+
+  /**
+   * Basic test for tagging behavior. 10 participants, of which 4 are tagged. Launch all 10,
+   * checking external view every time a tagged node is started. Then shut down all 10, checking
+   * external view every time a tagged node is killed.
+   */
+  @Test
+  public void testSafeAssignment() throws Exception {
+    final int NUM_PARTICIPANTS = 10;
+    final int NUM_PARTITIONS = 4;
+    final int NUM_REPLICAS = 2;
+    final String RESOURCE_NAME = "TestDB0";
+    final String TAG = "ASSIGNABLE";
+
+    final String[] TAGGED_NODES = {
+        "localhost_12920", "localhost_12922", "localhost_12924", "localhost_12925"
+    };
+    Set<String> taggedNodes = Sets.newHashSet(TAGGED_NODES);
+
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    // Set up cluster
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        NUM_PARTITIONS, // partitions per resource
+        NUM_PARTICIPANTS, // number of nodes
+        NUM_REPLICAS, // replicas
+        "MasterSlave", RebalanceMode.FULL_AUTO, // use FULL_AUTO mode to test node tagging
+        true); // do rebalance
+
+    // tag the resource and participants
+    HelixAdmin helixAdmin = new ZKHelixAdmin(ZK_ADDR);
+    for (String taggedNode : TAGGED_NODES) {
+      helixAdmin.addInstanceTag(clusterName, taggedNode, TAG);
+    }
+    IdealState idealState = helixAdmin.getResourceIdealState(clusterName, RESOURCE_NAME);
+    idealState.setInstanceGroupTag(TAG);
+    helixAdmin.setResourceIdealState(clusterName, RESOURCE_NAME, idealState);
+
+    // start controller
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+    controller.syncStart();
+
+    // start participants
+    MockParticipantManager[] participants = new MockParticipantManager[NUM_PARTICIPANTS];
+    for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+      final String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].syncStart();
+
+      // ensure that everything is valid if this is a tagged node that is starting
+      if (taggedNodes.contains(instanceName)) {
+        // make sure that the best possible matches the external view
+        Thread.sleep(500);
+        boolean result =
+            ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                clusterName));
+        Assert.assertTrue(result);
+
+        // make sure that the tagged state of the nodes is still balanced
+        result =
+            ClusterStateVerifier.verifyByZkCallback(new TaggedZkVerifier(clusterName,
+                RESOURCE_NAME, TAGGED_NODES, false));
+        Assert.assertTrue(result, "initial assignment with all tagged nodes live is invalid");
+      }
+    }
+
+    // cleanup
+    for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+      String participantName = participants[i].getInstanceName();
+      participants[i].syncStop();
+      if (taggedNodes.contains(participantName)) {
+        // check that the external view is still correct even after removing tagged nodes
+        taggedNodes.remove(participantName);
+        Thread.sleep(500);
+        boolean result =
+            ClusterStateVerifier.verifyByZkCallback(new TaggedZkVerifier(clusterName,
+                RESOURCE_NAME, TAGGED_NODES, taggedNodes.isEmpty()));
+        Assert.assertTrue(result, "incorrect state after removing " + participantName + ", "
+            + taggedNodes + " remain");
+      }
+    }
+    controller.syncStop();
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  /**
+   * Checker for basic validity of the external view given node tagging requirements
+   */
+  private static class TaggedZkVerifier implements ZkVerifier {
+    private final String _clusterName;
+    private final String _resourceName;
+    private final String[] _taggedNodes;
+    private final boolean _isEmptyAllowed;
+    private final ZkClient _zkClient;
+
+    /**
+     * Create a verifier for a specific cluster and resource
+     * @param clusterName the cluster to verify
+     * @param resourceName the resource within the cluster to verify
+     * @param taggedNodes nodes tagged with the resource tag
+     * @param isEmptyAllowed true if empty assignments are legal
+     */
+    public TaggedZkVerifier(String clusterName, String resourceName, String[] taggedNodes,
+        boolean isEmptyAllowed) {
+      _clusterName = clusterName;
+      _resourceName = resourceName;
+      _taggedNodes = taggedNodes;
+      _isEmptyAllowed = isEmptyAllowed;
+      _zkClient = ZKClientPool.getZkClient(ZK_ADDR);
+    }
+
+    @Override
+    public boolean verify() {
+      BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+      HelixDataAccessor accessor = new ZKHelixDataAccessor(_clusterName, baseAccessor);
+      PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+      ExternalView externalView = accessor.getProperty(keyBuilder.externalView(_resourceName));
+
+      Set<String> taggedNodeSet = ImmutableSet.copyOf(_taggedNodes);
+
+      // set up counts of partitions, masters, and slaves per node
+      Map<String, Integer> partitionCount = Maps.newHashMap();
+      int partitionSum = 0;
+      Map<String, Integer> masterCount = Maps.newHashMap();
+      int masterSum = 0;
+      Map<String, Integer> slaveCount = Maps.newHashMap();
+      int slaveSum = 0;
+
+      for (String partitionName : externalView.getPartitionSet()) {
+        Map<String, String> stateMap = externalView.getStateMap(partitionName);
+        for (String participantName : stateMap.keySet()) {
+          String state = stateMap.get(participantName);
+          if (state.equalsIgnoreCase("MASTER") || state.equalsIgnoreCase("SLAVE")) {
+            partitionSum++;
+            incrementCount(partitionCount, participantName);
+            if (!taggedNodeSet.contains(participantName)) {
+              // not allowed to have a non-tagged node assigned
+              LOG.error("Participant " + participantName + " is not tag, but has an assigned node");
+              return false;
+            } else if (state.equalsIgnoreCase("MASTER")) {
+              masterSum++;
+              incrementCount(masterCount, participantName);
+            } else if (state.equalsIgnoreCase("SLAVE")) {
+              slaveSum++;
+              incrementCount(slaveCount, participantName);
+            }
+          }
+        }
+      }
+
+      // check balance in partitions per node
+      if (partitionCount.size() > 0) {
+        boolean partitionMapDividesEvenly = partitionSum % partitionCount.size() == 0;
+        boolean withinAverage =
+            withinAverage(partitionCount, _isEmptyAllowed, partitionMapDividesEvenly);
+        if (!withinAverage) {
+          LOG.error("partition counts deviate from average");
+          return false;
+        }
+      } else {
+        if (!_isEmptyAllowed) {
+          LOG.error("partition assignments are empty");
+          return false;
+        }
+      }
+
+      // check balance in masters per node
+      if (masterCount.size() > 0) {
+        boolean masterMapDividesEvenly = masterSum % masterCount.size() == 0;
+        boolean withinAverage = withinAverage(masterCount, _isEmptyAllowed, masterMapDividesEvenly);
+        if (!withinAverage) {
+          LOG.error("master counts deviate from average");
+          return false;
+        }
+      } else {
+        if (!_isEmptyAllowed) {
+          LOG.error("master assignments are empty");
+          return false;
+        }
+      }
+
+      // check balance in slaves per node
+      if (slaveCount.size() > 0) {
+        boolean slaveMapDividesEvenly = slaveSum % slaveCount.size() == 0;
+        boolean withinAverage = withinAverage(slaveCount, true, slaveMapDividesEvenly);
+        if (!withinAverage) {
+          LOG.error("slave counts deviate from average");
+          return false;
+        }
+      }
+      return true;
+    }
+
+    private void incrementCount(Map<String, Integer> countMap, String key) {
+      if (!countMap.containsKey(key)) {
+        countMap.put(key, 0);
+      }
+      countMap.put(key, countMap.get(key) + 1);
+    }
+
+    private boolean withinAverage(Map<String, Integer> countMap, boolean isEmptyAllowed,
+        boolean dividesEvenly) {
+      if (countMap.size() == 0) {
+        if (!isEmptyAllowed) {
+          LOG.error("Map not allowed to be empty");
+          return false;
+        }
+        return true;
+      }
+      int upperBound = 1;
+      if (!dividesEvenly) {
+        upperBound = 2;
+      }
+      int average = computeAverage(countMap);
+      for (String participantName : countMap.keySet()) {
+        int count = countMap.get(participantName);
+        if (count < average - 1 || count > average + upperBound) {
+          LOG.error("Count " + count + " for " + participantName + " too far from average of "
+              + average);
+          return false;
+        }
+      }
+      return true;
+    }
+
+    private int computeAverage(Map<String, Integer> countMap) {
+      if (countMap.size() == 0) {
+        return -1;
+      }
+      int total = 0;
+      for (int value : countMap.values()) {
+        total += value;
+      }
+      return total / countMap.size();
+    }
+
+    @Override
+    public ZkClient getZkClient() {
+      return _zkClient;
+    }
+
+    @Override
+    public String getClusterName() {
+      return _clusterName;
+    }
+  }
+}