You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2017/02/08 17:59:46 UTC
[11/38] helix git commit: Add support for flexible hirerachy
representation of a cluster topology.
Add support for flexible hirerachy representation of a cluster topology.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/215039b3
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/215039b3
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/215039b3
Branch: refs/heads/helix-0.6.x
Commit: 215039b374ba1457d892d5f0c7ad758b393ed23a
Parents: 981d0e2
Author: Lei Xia <lx...@linkedin.com>
Authored: Fri Jul 1 10:20:50 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Sun Feb 5 19:04:29 2017 -0800
----------------------------------------------------------------------
.../strategy/CrushRebalanceStrategy.java | 1 +
.../MultiRoundCrushRebalanceStrategy.java | 3 +-
.../org/apache/helix/model/ClusterConfig.java | 4 +-
.../org/apache/helix/model/InstanceConfig.java | 6 +
.../helix/controller/Strategy/TestTopology.java | 172 +++++++++++++++++++
5 files changed, 183 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/215039b3/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
index b91d26c..e9a39a4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
@@ -22,6 +22,7 @@ package org.apache.helix.controller.rebalancer.strategy;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import org.apache.helix.HelixException;
+import org.apache.helix.HelixProperty;
import org.apache.helix.ZNRecord;
import org.apache.helix.controller.rebalancer.strategy.crushMapping.CRUSHPlacementAlgorithm;
import org.apache.helix.util.JenkinsHash;
http://git-wip-us.apache.org/repos/asf/helix/blob/215039b3/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java
index 93bd980..dc59596 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java
@@ -77,7 +77,8 @@ public class MultiRoundCrushRebalanceStrategy implements RebalanceStrategy {
final List<String> liveNodes, final Map<String, Map<String, String>> currentMapping,
ClusterDataCache clusterData) throws HelixException {
Map<String, InstanceConfig> instanceConfigMap = clusterData.getInstanceConfigMap();
- _clusterTopo = new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig());
+ _clusterTopo =
+ new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig());
Node root = _clusterTopo.getRootNode();
Map<String, List<Node>> zoneMapping = new HashMap<String, List<Node>>();
http://git-wip-us.apache.org/repos/asf/helix/blob/215039b3/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 1ed7cf9..7b30fd7 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -32,8 +32,8 @@ public class ClusterConfig extends HelixProperty {
public enum ClusterConfigProperty {
HELIX_DISABLE_PIPELINE_TRIGGERS,
TOPOLOGY, // cluster topology definition, for example, "/zone/rack/host/instance"
- FAULT_ZONE_TYPE, // the type in which isolation should be applied on when Helix places the replicas from same partition.
- PERSIST_BEST_POSSIBLE_ASSIGNMENT
+ PERSIST_BEST_POSSIBLE_ASSIGNMENT,
+ FAULT_ZONE_TYPE // the type in which isolation should be applied on when Helix places the replicas from same partition.
}
/**
http://git-wip-us.apache.org/repos/asf/helix/blob/215039b3/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
index ecf2900..52edaa7 100644
--- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -98,6 +98,12 @@ public class InstanceConfig extends HelixProperty {
_record.setSimpleField(InstanceConfigProperty.HELIX_PORT.toString(), port);
}
+ /**
+ * Set the zone identifier for this instance.
+ * This is deprecated, please use domain to set hierarchy tag for an instance.
+ * @return
+ */
+ @Deprecated
public String getZoneId() {
return _record.getSimpleField(InstanceConfigProperty.HELIX_ZONE_ID.name());
}
http://git-wip-us.apache.org/repos/asf/helix/blob/215039b3/helix-core/src/test/java/org/apache/helix/controller/Strategy/TestTopology.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/Strategy/TestTopology.java b/helix-core/src/test/java/org/apache/helix/controller/Strategy/TestTopology.java
new file mode 100644
index 0000000..185c2ca
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/Strategy/TestTopology.java
@@ -0,0 +1,172 @@
+package org.apache.helix.controller.Strategy;
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.controller.rebalancer.topology.Node;
+import org.apache.helix.controller.rebalancer.topology.Topology;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class TestTopology {
+ private static Logger logger = Logger.getLogger(TestAutoRebalanceStrategy.class);
+
+ @Test
+ public void testCreateClusterTopology() {
+ ClusterConfig clusterConfig = new ClusterConfig("Test_Cluster");
+
+ String topology = "/Rack/Sub-Rack/Host/Instance";
+ clusterConfig.getRecord().getSimpleFields()
+ .put(ClusterConfig.ClusterConfigProperty.TOPOLOGY.name(), topology);
+ clusterConfig.getRecord().getSimpleFields()
+ .put(ClusterConfig.ClusterConfigProperty.FAULT_ZONE_TYPE.name(), "Sub-Rack");
+
+ List<String> allNodes = new ArrayList<String>();
+ List<String> liveNodes = new ArrayList<String>();
+ Map<String, InstanceConfig> instanceConfigMap = new HashMap<String, InstanceConfig>();
+
+ Map<String, Integer> nodeToWeightMap = new HashMap<String, Integer>();
+
+ for (int i = 0; i < 100; i++) {
+ String instance = "localhost_" + i;
+ InstanceConfig config = new InstanceConfig(instance);
+ String rack_id = "rack_" + i/25;
+ String sub_rack_id = "subrack-" + i/5;
+
+ String domain =
+ String.format("Rack=%s, Sub-Rack=%s, Host=%s", rack_id, sub_rack_id, instance);
+ config.setDomain(domain);
+ config.setHostName(instance);
+ config.setPort("9000");
+ allNodes.add(instance);
+
+ int weight = 0;
+ if (i % 10 != 0) {
+ liveNodes.add(instance);
+ weight = 1000;
+ if (i % 3 == 0) {
+ // set random instance weight.
+ weight = (i+1) * 100;
+ config.setWeight(weight);
+ }
+ }
+
+ instanceConfigMap.put(instance, config);
+
+ if (!nodeToWeightMap.containsKey(rack_id)) {
+ nodeToWeightMap.put(rack_id, 0);
+ }
+ nodeToWeightMap.put(rack_id, nodeToWeightMap.get(rack_id) + weight);
+ if (!nodeToWeightMap.containsKey(sub_rack_id)) {
+ nodeToWeightMap.put(sub_rack_id, 0);
+ }
+ nodeToWeightMap.put(sub_rack_id, nodeToWeightMap.get(sub_rack_id) + weight);
+ }
+
+ Topology topo = new Topology(allNodes, liveNodes, instanceConfigMap, clusterConfig);
+
+ Assert.assertTrue(topo.getEndNodeType().equals("Instance"));
+ Assert.assertTrue(topo.getFaultZoneType().equals("Sub-Rack"));
+
+ List<Node> faultZones = topo.getFaultZones();
+ Assert.assertEquals(faultZones.size(), 20);
+
+ Node root = topo.getRootNode();
+
+ Assert.assertEquals(root.getChildrenCount("Rack"), 4);
+ Assert.assertEquals(root.getChildrenCount("Sub-Rack"), 20);
+ Assert.assertEquals(root.getChildrenCount("Host"), 100);
+ Assert.assertEquals(root.getChildrenCount("Instance"), 100);
+
+
+ // validate weights.
+ for (Node rack : root.getChildren()) {
+ Assert.assertEquals(rack.getWeight(), (long)nodeToWeightMap.get(rack.getName()));
+ for (Node subRack : rack.getChildren()) {
+ Assert.assertEquals(subRack.getWeight(), (long)nodeToWeightMap.get(subRack.getName()));
+ }
+ }
+ }
+
+ @Test
+ public void testCreateClusterTopologyWithDefaultTopology() {
+ ClusterConfig clusterConfig = new ClusterConfig("Test_Cluster");
+
+ List<String> allNodes = new ArrayList<String>();
+ List<String> liveNodes = new ArrayList<String>();
+ Map<String, InstanceConfig> instanceConfigMap = new HashMap<String, InstanceConfig>();
+
+ Map<String, Integer> nodeToWeightMap = new HashMap<String, Integer>();
+
+ for (int i = 0; i < 100; i++) {
+ String instance = "localhost_" + i;
+ InstanceConfig config = new InstanceConfig(instance);
+ String zoneId = "rack_" + i / 10;
+ config.setZoneId(zoneId);
+ config.setHostName(instance);
+ config.setPort("9000");
+ allNodes.add(instance);
+
+ int weight = 0;
+ if (i % 10 != 0) {
+ liveNodes.add(instance);
+ weight = 1000;
+ if (i % 3 == 0) {
+ // set random instance weight.
+ weight = (i + 1) * 100;
+ config.setWeight(weight);
+ }
+ }
+
+ instanceConfigMap.put(instance, config);
+
+ if (!nodeToWeightMap.containsKey(zoneId)) {
+ nodeToWeightMap.put(zoneId, 0);
+ }
+ nodeToWeightMap.put(zoneId, nodeToWeightMap.get(zoneId) + weight);
+ }
+
+ Topology topo = new Topology(allNodes, liveNodes, instanceConfigMap, clusterConfig);
+
+ Assert.assertTrue(topo.getEndNodeType().equals(Topology.Types.INSTANCE.name()));
+ Assert.assertTrue(topo.getFaultZoneType().equals(Topology.Types.ZONE.name()));
+
+ List<Node> faultZones = topo.getFaultZones();
+ Assert.assertEquals(faultZones.size(), 10);
+
+ Node root = topo.getRootNode();
+
+ Assert.assertEquals(root.getChildrenCount(Topology.Types.ZONE.name()), 10);
+ Assert.assertEquals(root.getChildrenCount(topo.getEndNodeType()), 100);
+
+ // validate weights.
+ for (Node rack : root.getChildren()) {
+ Assert.assertEquals(rack.getWeight(), (long) nodeToWeightMap.get(rack.getName()));
+ }
+ }
+}