You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2017/02/02 17:53:22 UTC
[2/9] helix git commit: Add test for SEMI_AUTO
Add test for SEMI_AUTO
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/fb7496f0
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/fb7496f0
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/fb7496f0
Branch: refs/heads/helix-0.6.x
Commit: fb7496f0a176f8b645563809fc7035f3acc1b102
Parents: cff462f
Author: Junkai Xue <jx...@linkedin.com>
Authored: Sat Jan 28 16:33:17 2017 -0800
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Sat Jan 28 16:33:17 2017 -0800
----------------------------------------------------------------------
.../integration/TestSemiAutoRebalance.java | 228 +++++++++++++++++++
1 file changed, 228 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/fb7496f0/helix-core/src/test/java/org/apache/helix/integration/TestSemiAutoRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSemiAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestSemiAutoRebalance.java
new file mode 100644
index 0000000..9a7ebdb
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSemiAutoRebalance.java
@@ -0,0 +1,228 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.MasterSlaveSMD;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestSemiAutoRebalance extends ZkIntegrationTestBase {
+ protected final String CLASS_NAME = getShortClassName();
+ protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+
+ protected static final int PARTICIPANT_NUMBER = 5;
+ protected static final int PARTICIPANT_START_PORT = 12918;
+
+ protected static final String DB_NAME = "TestDB";
+ protected static final int PARTITION_NUMBER = 20;
+ protected static final int REPLICA_NUMBER = 3;
+ protected static final String STATE_MODEL = "MasterSlave";
+
+ protected List<MockParticipantManager> _participants = new ArrayList<MockParticipantManager>();
+ protected ClusterControllerManager _controller;
+
+ protected HelixDataAccessor _accessor;
+ protected PropertyKey.Builder _keyBuilder;
+
+ @BeforeClass
+ public void beforeClass()
+ throws InterruptedException {
+ System.out.println(
+ "START " + getShortClassName() + " at " + new Date(System.currentTimeMillis()));
+
+ String namespace = "/" + CLUSTER_NAME;
+ if (_gZkClient.exists(namespace)) {
+ _gZkClient.deleteRecursive(namespace);
+ }
+
+ // setup storage cluster
+ _gSetupTool.addCluster(CLUSTER_NAME, true);
+ _gSetupTool.addResourceToCluster(CLUSTER_NAME, DB_NAME, PARTITION_NUMBER, STATE_MODEL,
+ IdealState.RebalanceMode.SEMI_AUTO.toString());
+
+ _accessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
+ _keyBuilder = _accessor.keyBuilder();
+
+ List<String> instances = new ArrayList<String>();
+ for (int i = 0; i < PARTICIPANT_NUMBER; i++) {
+ String instance = PARTICIPANT_PREFIX + "_" + (PARTICIPANT_START_PORT + i);
+ _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instance);
+ instances.add(instance);
+ }
+
+ _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, DB_NAME, REPLICA_NUMBER);
+
+ // start dummy participants
+ for (int i = 0; i < PARTICIPANT_NUMBER; i++) {
+ MockParticipantManager participant =
+ new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instances.get(i));
+ participant.syncStart();
+ _participants.add(participant);
+ }
+
+ // start controller
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+ _controller.syncStart();
+
+ Thread.sleep(1000);
+
+ // verify ideal state and external view
+ IdealState idealState = _accessor.getProperty(_keyBuilder.idealStates(DB_NAME));
+ Assert.assertNotNull(idealState);
+ Assert.assertEquals(idealState.getNumPartitions(), PARTITION_NUMBER);
+ for (String partition : idealState.getPartitionSet()) {
+ List<String> preferenceList = idealState.getPreferenceList(partition);
+ Assert.assertNotNull(preferenceList);
+ Assert.assertEquals(preferenceList.size(), REPLICA_NUMBER);
+ }
+
+ ExternalView externalView = _accessor.getProperty(_keyBuilder.externalView(DB_NAME));
+ Assert.assertNotNull(externalView);
+ Assert.assertEquals(externalView.getPartitionSet().size(), PARTITION_NUMBER);
+ for (String partition : externalView.getPartitionSet()) {
+ Map<String, String> stateMap = externalView.getStateMap(partition);
+ Assert.assertEquals(stateMap.size(), REPLICA_NUMBER);
+
+ int masters = 0;
+ for (String state : stateMap.values()) {
+ if (state.equals(MasterSlaveSMD.States.MASTER.name())) {
+ ++masters;
+ }
+ }
+ Assert.assertEquals(masters, 1);
+ }
+ }
+
+ @Test
+ public void testAddParticipant()
+ throws InterruptedException {
+ String newInstance = PARTICIPANT_PREFIX + "_" + (PARTICIPANT_START_PORT + _participants.size());
+ _gSetupTool.addInstanceToCluster(CLUSTER_NAME, newInstance);
+
+ MockParticipantManager newParticipant =
+ new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, newInstance);
+ newParticipant.syncStart();
+
+ Thread.sleep(1000);
+
+ List<String> instances = _accessor.getChildNames(_keyBuilder.instanceConfigs());
+ Assert.assertEquals(instances.size(), _participants.size() + 1);
+ Assert.assertTrue(instances.contains(newInstance));
+
+ List<String> liveInstances = _accessor.getChildNames(_keyBuilder.liveInstances());
+ Assert.assertEquals(liveInstances.size(), _participants.size() + 1);
+ Assert.assertTrue(liveInstances.contains(newInstance));
+
+ // nothing for new participant
+ ExternalView externalView = _accessor.getProperty(_keyBuilder.externalView(DB_NAME));
+ Assert.assertNotNull(externalView);
+ for (String partition : externalView.getPartitionSet()) {
+ Map<String, String> stateMap = externalView.getStateMap(partition);
+ Assert.assertFalse(stateMap.containsKey(newInstance));
+ }
+
+ // clear
+ newParticipant.syncStop();
+ _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, newInstance, false);
+ _gSetupTool.dropInstanceFromCluster(CLUSTER_NAME, newInstance);
+
+ instances = _accessor.getChildNames(_keyBuilder.instanceConfigs());
+ Assert.assertEquals(instances.size(), _participants.size());
+
+ liveInstances = _accessor.getChildNames(_keyBuilder.liveInstances());
+ Assert.assertEquals(liveInstances.size(), _participants.size());
+ }
+
+ @Test(dependsOnMethods = "testAddParticipant")
+ public void testStopAndReStartParticipant()
+ throws InterruptedException {
+ MockParticipantManager participant = _participants.get(0);
+ String instance = participant.getInstanceName();
+
+ Map<String, MasterSlaveSMD.States> affectedPartitions =
+ new HashMap<String, MasterSlaveSMD.States>();
+
+ ExternalView externalView = _accessor.getProperty(_keyBuilder.externalView(DB_NAME));
+
+ for (String partition : externalView.getPartitionSet()) {
+ Map<String, String> stateMap = externalView.getStateMap(partition);
+ if (stateMap.containsKey(instance)) {
+ affectedPartitions.put(partition, MasterSlaveSMD.States.valueOf(stateMap.get(instance)));
+ }
+ }
+
+ stopParticipant(participant, affectedPartitions);
+
+ // create a new participant
+ participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instance);
+ _participants.set(0, participant);
+ startParticipant(participant, affectedPartitions);
+ }
+
+ private void stopParticipant(
+ MockParticipantManager participant, Map<String, MasterSlaveSMD.States> affectedPartitions)
+ throws InterruptedException {
+ participant.syncStop();
+
+ Thread.sleep(1000);
+
+ ExternalView externalView = _accessor.getProperty(_keyBuilder.externalView(DB_NAME));
+ // No re-assignment of partition, if a MASTER is removed, one of SLAVE would be prompted
+ for (Map.Entry<String, MasterSlaveSMD.States> entry : affectedPartitions.entrySet()) {
+ Map<String, String> stateMap = externalView.getStateMap(entry.getKey());
+ Assert.assertEquals(stateMap.size(), REPLICA_NUMBER - 1);
+ Assert.assertTrue(stateMap.values().contains(MasterSlaveSMD.States.MASTER.toString()));
+ }
+ }
+
+ private void startParticipant(
+ MockParticipantManager participant, Map<String, MasterSlaveSMD.States> affectedPartitions)
+ throws InterruptedException {
+ String instance = participant.getInstanceName();
+ participant.syncStart();
+
+ Thread.sleep(1000);
+
+ ExternalView externalView = _accessor.getProperty(_keyBuilder.externalView(DB_NAME));
+ // Everything back to the initial state
+ for (Map.Entry<String, MasterSlaveSMD.States> entry : affectedPartitions.entrySet()) {
+ Map<String, String> stateMap = externalView.getStateMap(entry.getKey());
+ Assert.assertEquals(stateMap.size(), REPLICA_NUMBER);
+
+ Assert.assertTrue(stateMap.containsKey(instance));
+ Assert.assertEquals(stateMap.get(instance), entry.getValue().toString());
+ }
+ }
+}