You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2014/11/12 09:23:13 UTC

helix git commit: HELIX-543 RB-27808 Avoid moving partitions unnecessarily when auto-rebalancing

Repository: helix
Updated Branches:
  refs/heads/master e96ea8e20 -> dc9f129b6


HELIX-543 RB-27808 Avoid moving partitions unnecessarily when auto-rebalancing


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/dc9f129b
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/dc9f129b
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/dc9f129b

Branch: refs/heads/master
Commit: dc9f129b67f8cacdf0cd22288f166b56fc5654a0
Parents: e96ea8e
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Wed Nov 12 00:23:03 2014 -0800
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Wed Nov 12 00:23:03 2014 -0800

----------------------------------------------------------------------
 helix-agent/helix-agent-0.7.2-SNAPSHOT.ivy      |  18 ++++
 .../strategy/AutoRebalanceStrategy.java         |  61 +++++++++--
 .../strategy/TestAutoRebalanceStrategy.java     |  62 +++++++++++
 .../SinglePartitionLeaderStandByTest.java       | 108 +++++++++++++++++++
 4 files changed, 242 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/dc9f129b/helix-agent/helix-agent-0.7.2-SNAPSHOT.ivy
----------------------------------------------------------------------
diff --git a/helix-agent/helix-agent-0.7.2-SNAPSHOT.ivy b/helix-agent/helix-agent-0.7.2-SNAPSHOT.ivy
index f59be07..ef1f57e 100644
--- a/helix-agent/helix-agent-0.7.2-SNAPSHOT.ivy
+++ b/helix-agent/helix-agent-0.7.2-SNAPSHOT.ivy
@@ -1,4 +1,22 @@
 <?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
 <ivy-module version="2.0" xmlns:m="http://ant.apache.org/ivy/maven">
 	<info organisation="org.apache.helix"
 		module="helix-agent"

http://git-wip-us.apache.org/repos/asf/helix/blob/dc9f129b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
index 09b66c1..6e0e226 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
@@ -33,6 +33,9 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimaps;
 import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.State;
@@ -141,8 +144,12 @@ public class AutoRebalanceStrategy {
       final Map<PartitionId, Map<ParticipantId, State>> currentMapping,
       final List<ParticipantId> allNodes) {
     Comparator<ParticipantId> nodeComparator = new NodeComparator();
+    Comparator<ParticipantId> currentStateNodeComparator =
+        new CurrentStateNodeComparator(currentMapping);
+
     List<ParticipantId> sortedLiveNodes = new ArrayList<ParticipantId>(liveNodes);
-    Collections.sort(sortedLiveNodes, nodeComparator);
+    Collections.sort(sortedLiveNodes, currentStateNodeComparator);
+
     List<ParticipantId> sortedAllNodes = new ArrayList<ParticipantId>(allNodes);
     Collections.sort(sortedAllNodes, nodeComparator);
     List<String> sortedNodeNames =
@@ -481,7 +488,7 @@ public class AutoRebalanceStrategy {
   }
 
   /**
-   * Safe check for the number of replicas of a given id assiged to a node
+   * Safe check for the number of replicas of a given id assigned to a node
    * @param state the state to assign
    * @param node the node to check
    * @param nodeReplicaCounts a map of node to replica id and counts
@@ -496,11 +503,11 @@ public class AutoRebalanceStrategy {
       return 0;
     }
     Map<String, Integer> replicaCounts = nodeReplicaCounts.get(node);
-    if (!replicaCounts.containsKey(state)) {
+    if (!replicaCounts.containsKey(state.toString())) {
       replicaCounts.put(state.toString(), 0);
       return 0;
     }
-    return replicaCounts.get(state);
+    return replicaCounts.get(state.toString());
   }
 
   /**
@@ -609,7 +616,7 @@ public class AutoRebalanceStrategy {
   /**
    * Given a predefined set of all possible nodes, compute an assignment of replicas to
    * nodes that evenly assigns all replicas to nodes.
-   * @param allNodes Identifiers to all nodes, live and non-live
+   * @param nodeNames Identifiers to all nodes, live and non-live
    * @return Preferred assignment of replicas
    */
   private Map<Replica, Node> computePreferredPlacement(final List<String> nodeNames) {
@@ -633,8 +640,7 @@ public class AutoRebalanceStrategy {
 
   /**
    * Counts the total number of replicas given a state-count mapping
-   * @param states
-   * @return
+   * @return The number
    */
   private int countStateReplicas() {
     int total = 0;
@@ -844,4 +850,45 @@ public class AutoRebalanceStrategy {
       return o1.toString().compareTo(o2.toString());
     }
   }
+
+  /**
+   * Sorter for live nodes that sorts firstly according to the number of partitions currently
+   * registered against a node (more partitions means sort earlier), then by node name.
+   * This prevents unnecessarily moving partitions due to the capacity assignment
+   * unnecessarily reducing the capacity of lower down elements.
+   */
+  private static class CurrentStateNodeComparator implements Comparator<ParticipantId> {
+
+    /**
+     * The number of partitions that are active for each partition.
+     */
+    private final Map<ParticipantId, Integer> partitionCounts;
+
+    /**
+     * Create it.
+     * @param currentMapping The current mapping of partitions to participants.
+     */
+    public CurrentStateNodeComparator(Map<PartitionId, Map<ParticipantId, State>> currentMapping) {
+      partitionCounts = new HashMap<ParticipantId, Integer>();
+      for (Entry<PartitionId, Map<ParticipantId, State>> entry : currentMapping.entrySet()) {
+        for (ParticipantId participantId : entry.getValue().keySet()) {
+          Integer existing = partitionCounts.get(participantId);
+          partitionCounts.put(participantId, existing != null ? existing + 1 : 1);
+        }
+      }
+    }
+
+    @Override
+    public int compare(ParticipantId o1, ParticipantId o2) {
+      Integer c1 = partitionCounts.get(o1);
+      if (c1 == null) {
+        c1 = 0;
+      }
+      Integer c2 = partitionCounts.get(o2);
+      if (c2 == null) {
+        c2 = 0;
+      }
+      return c1 < c2 ? 1 : (c1 > c2 ? -1 : o1.toString().compareTo(o2.toString()));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/dc9f129b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
index 1322b40..25c550d 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
@@ -788,4 +788,66 @@ public class TestAutoRebalanceStrategy {
     }
     Assert.assertEquals(firstNodes.size(), 2, "masters not evenly distributed");
   }
+
+  /**
+   * Tests the following scenario: there is only a single partition for a resource. Two nodes up,
+   * partition should
+   * be assigned to one of them. Take down that node, partition should move. Bring back up that
+   * node, partition should not move unnecessarily.
+   */
+  @Test
+  public void testWontMoveSinglePartitionUnnecessarily() {
+    final ResourceId RESOURCE = ResourceId.from("resource");
+    final PartitionId partition = PartitionId.from("resource_0");
+    final StateModelDefinition STATE_MODEL =
+        new StateModelDefinition(StateModelConfigGenerator.generateConfigForOnlineOffline());
+    LinkedHashMap<State, Integer> stateCount = Maps.newLinkedHashMap();
+    stateCount.put(State.from("ONLINE"), 1);
+    final ParticipantId[] NODES = {
+        ParticipantId.from("n0"), ParticipantId.from("n1")
+    };
+
+    ReplicaPlacementScheme scheme = new AutoRebalanceStrategy.DefaultPlacementScheme();
+    // initial state, one node, no mapping
+    List<ParticipantId> allNodes = Lists.newArrayList(NODES);
+    List<ParticipantId> liveNodes = Lists.newArrayList(NODES);
+    Map<PartitionId, Map<ParticipantId, State>> currentMapping = Maps.newHashMap();
+    currentMapping.put(partition, new HashMap<ParticipantId, State>());
+
+    // Both nodes there
+    List<PartitionId> partitions = Lists.newArrayList(partition);
+    Map<State, String> upperBounds = Maps.newHashMap();
+    for (State state : STATE_MODEL.getTypedStatesPriorityList()) {
+      upperBounds.put(state, STATE_MODEL.getNumParticipantsPerState(state));
+    }
+
+    ZNRecord znRecord =
+        new AutoRebalanceStrategy(RESOURCE, partitions, stateCount, Integer.MAX_VALUE, scheme)
+            .typedComputePartitionAssignment(liveNodes, currentMapping, allNodes);
+    Map<String, List<String>> preferenceLists = znRecord.getListFields();
+    List<String> preferenceList = preferenceLists.get(partition.toString());
+    Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+    Assert.assertEquals(preferenceList.size(), 1, "invalid preference list for " + partition);
+    String state = znRecord.getMapField(partition.toString()).get(preferenceList.get(0));
+    Assert.assertEquals(state, "ONLINE", "Invalid state for " + partition);
+    ParticipantId preferredNode = ParticipantId.from(preferenceList.get(0));
+    ParticipantId otherNode = preferredNode.equals(NODES[0]) ? NODES[1] : NODES[0];
+    // ok, see what happens if we've got the partition on the other node (e.g. due to the preferred
+    // node being down).
+    currentMapping.get(partition).put(otherNode, State.from(state));
+
+    znRecord =
+        new AutoRebalanceStrategy(RESOURCE, partitions, stateCount, Integer.MAX_VALUE, scheme)
+            .typedComputePartitionAssignment(liveNodes, currentMapping, allNodes);
+
+    preferenceLists = znRecord.getListFields();
+    preferenceList = preferenceLists.get(partition.toString());
+    Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+    Assert.assertEquals(preferenceList.size(), 1, "invalid preference list for " + partition);
+    state = znRecord.getMapField(partition.toString()).get(preferenceList.get(0));
+    Assert.assertEquals(state, "ONLINE", "Invalid state for " + partition);
+    ParticipantId finalPreferredNode = ParticipantId.from(preferenceList.get(0));
+    // finally, make sure we haven't moved it.
+    Assert.assertEquals(finalPreferredNode, otherNode);
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/dc9f129b/helix-core/src/test/java/org/apache/helix/integration/SinglePartitionLeaderStandByTest.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/SinglePartitionLeaderStandByTest.java b/helix-core/src/test/java/org/apache/helix/integration/SinglePartitionLeaderStandByTest.java
new file mode 100644
index 0000000..ec8de0a
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/SinglePartitionLeaderStandByTest.java
@@ -0,0 +1,108 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Arrays;
+import java.util.Date;
+
+import org.apache.helix.HelixConstants;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.manager.zk.MockController;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.testutil.TestUtil;
+import org.apache.helix.testutil.ZkTestBase;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * This is a simple integration test. We will use this until we have framework
+ * which helps us write integration tests easily
+ */
+
+public class SinglePartitionLeaderStandByTest extends ZkTestBase {
+  @Test
+  public void test() throws Exception {
+    String clusterName = TestUtil.getTestName();
+    int n = 4;
+
+    System.out.println("START " + clusterName +" at " + new Date(System.currentTimeMillis()));
+
+    // Thread.currentThread().join();
+    TestHelper.setupCluster(clusterName, _zkaddr, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        2, // partitions per resource
+        n, // number of nodes
+        0, // replicas
+        "LeaderStandby", 
+        RebalanceMode.FULL_AUTO,
+        false); // dont rebalance
+
+    // rebalance ideal-state to use ANY_LIVEINSTANCE for preference list
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, _baseAccessor);
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey key = keyBuilder.idealStates("TestDB0");
+    IdealState idealState = accessor.getProperty(key);
+    idealState.setReplicas(HelixConstants.StateModelToken.ANY_LIVEINSTANCE.toString());
+    idealState.getRecord().setListField("TestDB0_0",
+        Arrays.asList(HelixConstants.StateModelToken.ANY_LIVEINSTANCE.toString()));
+    accessor.setProperty(key, idealState);
+
+    MockController controller =
+        new MockController(_zkaddr, clusterName, "controller_0");
+    controller.syncStart();
+
+    // start participants
+    MockParticipant[] participants = new MockParticipant[n];
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(_zkaddr,
+            clusterName));
+
+    Assert.assertTrue(result);
+    //stop the first participatn
+    participants[0].syncStop();
+    Thread.sleep(10000);
+    for (int i = 0; i < 1; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
+      participants[i].syncStart();
+    }
+    // clean up
+    controller.syncStop();
+    for (int i = 0; i < n; i++) {
+      participants[i].syncStop();
+    }
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+}