You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 00:26:41 UTC
[34/47] Refactoring from com.linkedin.helix to org.apache.helix
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/controller/stages/DummyClusterManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/controller/stages/DummyClusterManager.java b/helix-core/src/test/java/com/linkedin/helix/controller/stages/DummyClusterManager.java
deleted file mode 100644
index 42f38df..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/controller/stages/DummyClusterManager.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import com.linkedin.helix.ClusterMessagingService;
-import com.linkedin.helix.ConfigAccessor;
-import com.linkedin.helix.ConfigChangeListener;
-import com.linkedin.helix.ControllerChangeListener;
-import com.linkedin.helix.CurrentStateChangeListener;
-import com.linkedin.helix.DataAccessor;
-import com.linkedin.helix.ExternalViewChangeListener;
-import com.linkedin.helix.HealthStateChangeListener;
-import com.linkedin.helix.HelixAdmin;
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.IdealStateChangeListener;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.LiveInstanceChangeListener;
-import com.linkedin.helix.MessageListener;
-import com.linkedin.helix.PreConnectCallback;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.healthcheck.ParticipantHealthReportCollector;
-import com.linkedin.helix.participant.StateMachineEngine;
-import com.linkedin.helix.store.PropertyStore;
-import com.linkedin.helix.store.zk.ZkHelixPropertyStore;
-
-public class DummyClusterManager implements HelixManager
-{
- HelixDataAccessor _accessor;
- String _clusterName;
- String _sessionId;
-
- public DummyClusterManager(String clusterName, HelixDataAccessor accessor)
- {
- _clusterName = clusterName;
- _accessor = accessor;
- _sessionId = "session_" + clusterName;
- }
-
- @Override
- public void connect() throws Exception
- {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public boolean isConnected()
- {
- // TODO Auto-generated method stub
- return false;
- }
-
- @Override
- public void disconnect()
- {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void addIdealStateChangeListener(IdealStateChangeListener listener) throws Exception
- {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void addLiveInstanceChangeListener(LiveInstanceChangeListener listener) throws Exception
- {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void addConfigChangeListener(ConfigChangeListener listener) throws Exception
- {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void addMessageListener(MessageListener listener, String instanceName) throws Exception
- {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void addCurrentStateChangeListener(CurrentStateChangeListener listener,
- String instanceName,
- String sessionId) throws Exception
- {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void addExternalViewChangeListener(ExternalViewChangeListener listener) throws Exception
- {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public boolean removeListener(Object listener)
- {
- // TODO Auto-generated method stub
- return false;
- }
-
- @Override
- public DataAccessor getDataAccessor()
- {
- return null;
- }
-
- @Override
- public String getClusterName()
- {
- return _clusterName;
- }
-
- @Override
- public String getInstanceName()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public String getSessionId()
- {
- return _sessionId;
- }
-
- @Override
- public long getLastNotificationTime()
- {
- // TODO Auto-generated method stub
- return 0;
- }
-
- @Override
- public void addControllerListener(ControllerChangeListener listener)
- {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public HelixAdmin getClusterManagmentTool()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public PropertyStore<ZNRecord> getPropertyStore()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public ClusterMessagingService getMessagingService()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public ParticipantHealthReportCollector getHealthReportCollector()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public InstanceType getInstanceType()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public String getVersion()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public void addHealthStateChangeListener(HealthStateChangeListener listener,
- String instanceName) throws Exception
- {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public StateMachineEngine getStateMachineEngine()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public boolean isLeader()
- {
- // TODO Auto-generated method stub
- return false;
- }
-
- @Override
- public ConfigAccessor getConfigAccessor()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public void startTimerTasks()
- {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void stopTimerTasks()
- {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public HelixDataAccessor getHelixDataAccessor()
- {
- return _accessor;
- }
-
- @Override
- public void addPreConnectCallback(PreConnectCallback callback)
- {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public ZkHelixPropertyStore<ZNRecord> getHelixPropertyStore()
- {
- // TODO Auto-generated method stub
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/controller/stages/TestBestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/controller/stages/TestBestPossibleStateCalcStage.java b/helix-core/src/test/java/com/linkedin/helix/controller/stages/TestBestPossibleStateCalcStage.java
deleted file mode 100644
index 1ae821c..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/controller/stages/TestBestPossibleStateCalcStage.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.Date;
-import java.util.Map;
-
-import org.testng.AssertJUnit;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.controller.stages.AttributeName;
-import com.linkedin.helix.controller.stages.BestPossibleStateCalcStage;
-import com.linkedin.helix.controller.stages.BestPossibleStateOutput;
-import com.linkedin.helix.controller.stages.CurrentStateOutput;
-import com.linkedin.helix.controller.stages.ReadClusterDataStage;
-import com.linkedin.helix.model.Resource;
-import com.linkedin.helix.model.Partition;
-
-public class TestBestPossibleStateCalcStage extends BaseStageTest
-{
- @Test
- public void testSimple()
- {
- System.out.println("START TestBestPossibleStateCalcStage at " + new Date(System.currentTimeMillis()));
-// List<IdealState> idealStates = new ArrayList<IdealState>();
-
- String[] resources = new String[]{ "testResourceName" };
- setupIdealState(5, resources, 10, 1);
- setupLiveInstances(5);
- setupStateModel();
-
- Map<String, Resource> resourceMap = getResourceMap();
- CurrentStateOutput currentStateOutput = new CurrentStateOutput();
- event.addAttribute(AttributeName.RESOURCES.toString(),
- resourceMap);
- event.addAttribute(AttributeName.CURRENT_STATE.toString(),
- currentStateOutput);
-
- ReadClusterDataStage stage1 = new ReadClusterDataStage();
- runStage(event, stage1);
- BestPossibleStateCalcStage stage2 = new BestPossibleStateCalcStage();
- runStage(event, stage2);
-
- BestPossibleStateOutput output = event
- .getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
- for (int p = 0; p < 5; p++)
- {
- Partition resource = new Partition("testResourceName_" + p);
- AssertJUnit.assertEquals(
- "MASTER",
- output.getInstanceStateMap("testResourceName", resource).get(
- "localhost_" + (p + 1) % 5));
- }
- System.out.println("END TestBestPossibleStateCalcStage at " + new Date(System.currentTimeMillis()));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/controller/stages/TestClusterEvent.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/controller/stages/TestClusterEvent.java b/helix-core/src/test/java/com/linkedin/helix/controller/stages/TestClusterEvent.java
deleted file mode 100644
index 4b45473..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/controller/stages/TestClusterEvent.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import org.testng.annotations.Test;
-import org.testng.AssertJUnit;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.controller.stages.ClusterEvent;
-@Test
-public class TestClusterEvent
-{
- @Test
- public void testSimplePutandGet(){
- ClusterEvent event = new ClusterEvent("name");
- AssertJUnit.assertEquals(event.getName(), "name");
- event.addAttribute("attr1", "value");
- AssertJUnit.assertEquals(event.getAttribute("attr1"), "value");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/controller/stages/TestCompatibilityCheckStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/controller/stages/TestCompatibilityCheckStage.java b/helix-core/src/test/java/com/linkedin/helix/controller/stages/TestCompatibilityCheckStage.java
deleted file mode 100644
index 26d1dfc..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/controller/stages/TestCompatibilityCheckStage.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.Mocks;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.controller.pipeline.StageContext;
-import com.linkedin.helix.model.IdealState;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.model.LiveInstance.LiveInstanceProperty;
-import com.linkedin.helix.tools.IdealStateCalculatorForStorageNode;
-
-public class TestCompatibilityCheckStage extends BaseStageTest
-{
- private void prepare(String controllerVersion, String participantVersion)
- {
- List<String> instances = Arrays.asList("localhost_0", "localhost_1",
- "localhost_2", "localhost_3", "localhost_4");
- int partitions = 10;
- int replicas = 1;
-
- // set ideal state
- String resourceName = "testResource";
- ZNRecord record = IdealStateCalculatorForStorageNode.calculateIdealState(
- instances, partitions, replicas, resourceName, "MASTER", "SLAVE");
- IdealState idealState = new IdealState(record);
- idealState.setStateModelDefRef("MasterSlave");
-
- Builder keyBuilder = accessor.keyBuilder();
- accessor.setProperty(keyBuilder.idealStates(resourceName), idealState);
-
- // set live instances
- record = new ZNRecord("localhost_0");
- if (participantVersion != null)
- {
- record.setSimpleField(LiveInstanceProperty.HELIX_VERSION.toString(), participantVersion);
- }
- LiveInstance liveInstance = new LiveInstance(record);
- liveInstance.setSessionId("session_0");
- accessor.setProperty(keyBuilder.liveInstance("localhost_0"), liveInstance);
-
- if (controllerVersion != null)
- {
- ((Mocks.MockManager)manager).setVersion(controllerVersion);
- }
- event.addAttribute("helixmanager", manager);
- runStage(event, new ReadClusterDataStage());
- }
-
- @Test
- public void testCompatible()
- {
- prepare("0.4.0", "0.4.0");
- CompatibilityCheckStage stage = new CompatibilityCheckStage();
- StageContext context = new StageContext();
- stage.init(context);
- stage.preProcess();
- try
- {
- stage.process(event);
- }
- catch (Exception e)
- {
- Assert.fail("Should not fail since versions are compatible");
- }
- stage.postProcess();
- }
-
- @Test
- public void testNullParticipantVersion()
- {
- prepare("0.4.0", null);
- CompatibilityCheckStage stage = new CompatibilityCheckStage();
- StageContext context = new StageContext();
- stage.init(context);
- stage.preProcess();
- try
- {
- stage.process(event);
- }
- catch (Exception e)
- {
- Assert.fail("Should not fail since only participant version is null");
- }
- stage.postProcess();
- }
-
- @Test
- public void testNullControllerVersion()
- {
- prepare(null, "0.4.0");
- CompatibilityCheckStage stage = new CompatibilityCheckStage();
- StageContext context = new StageContext();
- stage.init(context);
- stage.preProcess();
- try
- {
- stage.process(event);
- Assert.fail("Should fail since controller version is null");
- }
- catch (Exception e)
- {
- // OK
- }
- stage.postProcess();
- }
-
- @Test
- public void testControllerVersionLessThanParticipantVersion()
- {
- prepare("0.2.12", "0.3.4");
- CompatibilityCheckStage stage = new CompatibilityCheckStage();
- StageContext context = new StageContext();
- stage.init(context);
- stage.preProcess();
- try
- {
- stage.process(event);
- Assert.fail("Should fail since controller primary version is less than participant primary version");
- }
- catch (Exception e)
- {
- // OK
- }
- stage.postProcess();
- }
-
- @Test
- public void testIncompatible()
- {
- prepare("0.4.12", "0.3.4");
- CompatibilityCheckStage stage = new CompatibilityCheckStage();
- StageContext context = new StageContext();
- stage.init(context);
- stage.preProcess();
- try
- {
- stage.process(event);
- Assert.fail("Should fail since controller primary version is incompatible with participant primary version");
- }
- catch (Exception e)
- {
- // OK
- }
- stage.postProcess();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/controller/stages/TestCurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/controller/stages/TestCurrentStateComputationStage.java b/helix-core/src/test/java/com/linkedin/helix/controller/stages/TestCurrentStateComputationStage.java
deleted file mode 100644
index 5b15e42..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/controller/stages/TestCurrentStateComputationStage.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.Map;
-
-import org.testng.AssertJUnit;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.model.CurrentState;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Partition;
-import com.linkedin.helix.model.Resource;
-
-public class TestCurrentStateComputationStage extends BaseStageTest
-{
-
- @Test
- public void testEmptyCS()
- {
- Map<String, Resource> resourceMap = getResourceMap();
- event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
- CurrentStateComputationStage stage = new CurrentStateComputationStage();
- runStage(event, new ReadClusterDataStage());
- runStage(event, stage);
- CurrentStateOutput output =
- event.getAttribute(AttributeName.CURRENT_STATE.toString());
- AssertJUnit.assertEquals(output.getCurrentStateMap("testResourceName",
- new Partition("testResourceName_0"))
- .size(),
- 0);
- }
-
- @Test
- public void testSimpleCS()
- {
- // setup resource
- Map<String, Resource> resourceMap = getResourceMap();
-
- setupLiveInstances(5);
-
- event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
- CurrentStateComputationStage stage = new CurrentStateComputationStage();
- runStage(event, new ReadClusterDataStage());
- runStage(event, stage);
- CurrentStateOutput output1 =
- event.getAttribute(AttributeName.CURRENT_STATE.toString());
- AssertJUnit.assertEquals(output1.getCurrentStateMap("testResourceName",
- new Partition("testResourceName_0"))
- .size(),
- 0);
-
- // Add a state transition messages
- Message message = new Message(Message.MessageType.STATE_TRANSITION, "msg1");
- message.setFromState("OFFLINE");
- message.setToState("SLAVE");
- message.setResourceName("testResourceName");
- message.setPartitionName("testResourceName_1");
- message.setTgtName("localhost_3");
- message.setTgtSessionId("session_3");
-
- Builder keyBuilder = accessor.keyBuilder();
- accessor.setProperty(keyBuilder.message("localhost_" + 3, message.getId()), message);
-
- runStage(event, new ReadClusterDataStage());
- runStage(event, stage);
- CurrentStateOutput output2 =
- event.getAttribute(AttributeName.CURRENT_STATE.toString());
- String pendingState =
- output2.getPendingState("testResourceName",
- new Partition("testResourceName_1"),
- "localhost_3");
- AssertJUnit.assertEquals(pendingState, "SLAVE");
-
- ZNRecord record1 = new ZNRecord("testResourceName");
- // Add a current state that matches sessionId and one that does not match
- CurrentState stateWithLiveSession = new CurrentState(record1);
- stateWithLiveSession.setSessionId("session_3");
- stateWithLiveSession.setStateModelDefRef("MasterSlave");
- stateWithLiveSession.setState("testResourceName_1", "OFFLINE");
- ZNRecord record2 = new ZNRecord("testResourceName");
- CurrentState stateWithDeadSession = new CurrentState(record2);
- stateWithDeadSession.setSessionId("session_dead");
- stateWithDeadSession.setStateModelDefRef("MasterSlave");
- stateWithDeadSession.setState("testResourceName_1", "MASTER");
-
- accessor.setProperty(keyBuilder.currentState("localhost_3",
- "session_3",
- "testResourceName"),
- stateWithLiveSession);
- accessor.setProperty(keyBuilder.currentState("localhost_3",
- "session_dead",
- "testResourceName"),
- stateWithDeadSession);
- runStage(event, new ReadClusterDataStage());
- runStage(event, stage);
- CurrentStateOutput output3 =
- event.getAttribute(AttributeName.CURRENT_STATE.toString());
- String currentState =
- output3.getCurrentState("testResourceName",
- new Partition("testResourceName_1"),
- "localhost_3");
- AssertJUnit.assertEquals(currentState, "OFFLINE");
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/controller/stages/TestMessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/controller/stages/TestMessageThrottleStage.java b/helix-core/src/test/java/com/linkedin/helix/controller/stages/TestMessageThrottleStage.java
deleted file mode 100644
index f1cba2e..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/controller/stages/TestMessageThrottleStage.java
+++ /dev/null
@@ -1,358 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.ZkUnitTestBase;
-import com.linkedin.helix.controller.pipeline.Pipeline;
-import com.linkedin.helix.manager.zk.ZKHelixDataAccessor;
-import com.linkedin.helix.manager.zk.ZkBaseDataAccessor;
-import com.linkedin.helix.model.ClusterConstraints;
-import com.linkedin.helix.model.ClusterConstraints.ConstraintAttribute;
-import com.linkedin.helix.model.ClusterConstraints.ConstraintItem;
-import com.linkedin.helix.model.ClusterConstraints.ConstraintType;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Message.MessageType;
-import com.linkedin.helix.model.Partition;
-
-public class TestMessageThrottleStage extends ZkUnitTestBase
-{
- private static final Logger LOG =
- Logger.getLogger(TestMessageThrottleStage.class.getName());
- final String _className = getShortClassName();
-
- @Test
- public void testMsgThrottleBasic() throws Exception
- {
- String clusterName = "CLUSTER_" + _className + "_basic";
- System.out.println("START " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- HelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
- HelixManager manager = new DummyClusterManager(clusterName, accessor);
-
- // ideal state: node0 is MASTER, node1 is SLAVE
- // replica=2 means 1 master and 1 slave
- setupIdealState(clusterName, new int[] { 0, 1 }, new String[] { "TestDB" }, 1, 2);
- setupLiveInstances(clusterName, new int[] { 0, 1 });
- setupStateModel(clusterName);
-
- ClusterEvent event = new ClusterEvent("testEvent");
- event.addAttribute("helixmanager", manager);
-
- MessageThrottleStage throttleStage = new MessageThrottleStage();
- try
- {
- runStage(event, throttleStage);
- Assert.fail("Should throw exception since DATA_CACHE is null");
- }
- catch (Exception e)
- {
- // OK
- }
-
- Pipeline dataRefresh = new Pipeline();
- dataRefresh.addStage(new ReadClusterDataStage());
- runPipeline(event, dataRefresh);
-
- try
- {
- runStage(event, throttleStage);
- Assert.fail("Should throw exception since RESOURCE is null");
- }
- catch (Exception e)
- {
- // OK
- }
- runStage(event, new ResourceComputationStage());
-
- try
- {
- runStage(event, throttleStage);
- Assert.fail("Should throw exception since MESSAGE_SELECT is null");
- }
- catch (Exception e)
- {
- // OK
- }
- MessageSelectionStageOutput msgSelectOutput = new MessageSelectionStageOutput();
- List<Message> selectMessages = new ArrayList<Message>();
- Message msg =
- createMessage(MessageType.STATE_TRANSITION,
- "msgId-001",
- "OFFLINE",
- "SLAVE",
- "TestDB",
- "localhost_0");
- selectMessages.add(msg);
-
- msgSelectOutput.addMessages("TestDB", new Partition("TestDB_0"), selectMessages);
- event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), msgSelectOutput);
-
- runStage(event, throttleStage);
-
- MessageThrottleStageOutput msgThrottleOutput =
- event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
- Assert.assertEquals(msgThrottleOutput.getMessages("TestDB", new Partition("TestDB_0"))
- .size(),
- 1);
-
- System.out.println("END " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- }
-
- @Test()
- public void testMsgThrottleConstraints() throws Exception
- {
- String clusterName = "CLUSTER_" + _className + "_constraints";
- System.out.println("START " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- HelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
- HelixManager manager = new DummyClusterManager(clusterName, accessor);
-
- // ideal state: node0 is MASTER, node1 is SLAVE
- // replica=2 means 1 master and 1 slave
- setupIdealState(clusterName, new int[] { 0, 1 }, new String[] { "TestDB" }, 1, 2);
- setupLiveInstances(clusterName, new int[] { 0, 1 });
- setupStateModel(clusterName);
-
- // setup constraints
- ZNRecord record = new ZNRecord(ConstraintType.MESSAGE_CONSTRAINT.toString());
-
- // constraint0:
- // "MESSAGE_TYPE=STATE_TRANSITION,CONSTRAINT_VALUE=ANY"
- record.setMapField("constraint0", new TreeMap<String, String>());
- record.getMapField("constraint0").put("MESSAGE_TYPE", "STATE_TRANSITION");
- record.getMapField("constraint0").put("CONSTRAINT_VALUE", "ANY");
- ConstraintItem constraint0 = new ConstraintItem(record.getMapField("constraint0"));
-
- // constraint1:
- // "MESSAGE_TYPE=STATE_TRANSITION,TRANSITION=OFFLINE-SLAVE,CONSTRAINT_VALUE=ANY"
- record.setMapField("constraint1", new TreeMap<String, String>());
- record.getMapField("constraint1").put("MESSAGE_TYPE", "STATE_TRANSITION");
- record.getMapField("constraint1").put("TRANSITION", "OFFLINE-SLAVE");
- record.getMapField("constraint1").put("CONSTRAINT_VALUE", "50");
- ConstraintItem constraint1 = new ConstraintItem(record.getMapField("constraint1"));
-
- // constraint2:
- // "MESSAGE_TYPE=STATE_TRANSITION,TRANSITION=OFFLINE-SLAVE,INSTANCE=.*,RESOURCE=TestDB,CONSTRAINT_VALUE=2";
- record.setMapField("constraint2", new TreeMap<String, String>());
- record.getMapField("constraint2").put("MESSAGE_TYPE", "STATE_TRANSITION");
- record.getMapField("constraint2").put("TRANSITION", "OFFLINE-SLAVE");
- record.getMapField("constraint2").put("INSTANCE", ".*");
- record.getMapField("constraint2").put("RESOURCE", "TestDB");
- record.getMapField("constraint2").put("CONSTRAINT_VALUE", "2");
- ConstraintItem constraint2 = new ConstraintItem(record.getMapField("constraint2"));
-
- // constraint3:
- // "MESSAGE_TYPE=STATE_TRANSITION,TRANSITION=OFFLINE-SLAVE,INSTANCE=localhost_12918,RESOURCE=.*,CONSTRAINT_VALUE=1";
- record.setMapField("constraint3", new TreeMap<String, String>());
- record.getMapField("constraint3").put("MESSAGE_TYPE", "STATE_TRANSITION");
- record.getMapField("constraint3").put("TRANSITION", "OFFLINE-SLAVE");
- record.getMapField("constraint3").put("INSTANCE", "localhost_1");
- record.getMapField("constraint3").put("RESOURCE", ".*");
- record.getMapField("constraint3").put("CONSTRAINT_VALUE", "1");
- ConstraintItem constraint3 = new ConstraintItem(record.getMapField("constraint3"));
-
- // constraint4:
- // "MESSAGE_TYPE=STATE_TRANSITION,TRANSITION=OFFLINE-SLAVE,INSTANCE=.*,RESOURCE=.*,CONSTRAINT_VALUE=10"
- record.setMapField("constraint4", new TreeMap<String, String>());
- record.getMapField("constraint4").put("MESSAGE_TYPE", "STATE_TRANSITION");
- record.getMapField("constraint4").put("TRANSITION", "OFFLINE-SLAVE");
- record.getMapField("constraint4").put("INSTANCE", ".*");
- record.getMapField("constraint4").put("RESOURCE", ".*");
- record.getMapField("constraint4").put("CONSTRAINT_VALUE", "10");
- ConstraintItem constraint4 = new ConstraintItem(record.getMapField("constraint4"));
-
- // constraint5:
- // "MESSAGE_TYPE=STATE_TRANSITION,TRANSITION=OFFLINE-SLAVE,INSTANCE=localhost_12918,RESOURCE=TestDB,CONSTRAINT_VALUE=5"
- record.setMapField("constraint5", new TreeMap<String, String>());
- record.getMapField("constraint5").put("MESSAGE_TYPE", "STATE_TRANSITION");
- record.getMapField("constraint5").put("TRANSITION", "OFFLINE-SLAVE");
- record.getMapField("constraint5").put("INSTANCE", "localhost_0");
- record.getMapField("constraint5").put("RESOURCE", "TestDB");
- record.getMapField("constraint5").put("CONSTRAINT_VALUE", "3");
- ConstraintItem constraint5 = new ConstraintItem(record.getMapField("constraint5"));
-
- Builder keyBuilder = accessor.keyBuilder();
- accessor.setProperty(keyBuilder.constraint(ConstraintType.MESSAGE_CONSTRAINT.toString()),
- new ClusterConstraints(record));
-
- // ClusterConstraints constraint =
- // accessor.getProperty(ClusterConstraints.class,
- // PropertyType.CONFIGS,
- // ConfigScopeProperty.CONSTRAINT.toString(),
- // ConstraintType.MESSAGE_CONSTRAINT.toString());
- ClusterConstraints constraint =
- accessor.getProperty(keyBuilder.constraint(ConstraintType.MESSAGE_CONSTRAINT.toString()));
-
- MessageThrottleStage throttleStage = new MessageThrottleStage();
-
- // test constraintSelection
- // message1: hit contraintSelection rule1 and rule2
- Message msg1 =
- createMessage(MessageType.STATE_TRANSITION,
- "msgId-001",
- "OFFLINE",
- "SLAVE",
- "TestDB",
- "localhost_0");
-
- Map<ConstraintAttribute, String> msgAttr =
- ClusterConstraints.toConstraintAttributes(msg1);
- Set<ConstraintItem> matches = constraint.match(msgAttr);
- System.out.println(msg1 + " matches(" + matches.size() + "): " + matches);
- Assert.assertEquals(matches.size(), 5);
- Assert.assertTrue(containsConstraint(matches, constraint0));
- Assert.assertTrue(containsConstraint(matches, constraint1));
- Assert.assertTrue(containsConstraint(matches, constraint2));
- Assert.assertTrue(containsConstraint(matches, constraint4));
- Assert.assertTrue(containsConstraint(matches, constraint5));
-
- matches = throttleStage.selectConstraints(matches, msgAttr);
- System.out.println(msg1 + " matches(" + matches.size() + "): " + matches);
- Assert.assertEquals(matches.size(), 2);
- Assert.assertTrue(containsConstraint(matches, constraint1));
- Assert.assertTrue(containsConstraint(matches, constraint5));
-
- // message2: hit contraintSelection rule1, rule2, and rule3
- Message msg2 =
- createMessage(MessageType.STATE_TRANSITION,
- "msgId-002",
- "OFFLINE",
- "SLAVE",
- "TestDB",
- "localhost_1");
-
- msgAttr = ClusterConstraints.toConstraintAttributes(msg2);
- matches = constraint.match(msgAttr);
- System.out.println(msg2 + " matches(" + matches.size() + "): " + matches);
- Assert.assertEquals(matches.size(), 5);
- Assert.assertTrue(containsConstraint(matches, constraint0));
- Assert.assertTrue(containsConstraint(matches, constraint1));
- Assert.assertTrue(containsConstraint(matches, constraint2));
- Assert.assertTrue(containsConstraint(matches, constraint3));
- Assert.assertTrue(containsConstraint(matches, constraint4));
-
- matches = throttleStage.selectConstraints(matches, msgAttr);
- System.out.println(msg2 + " matches(" + matches.size() + "): " + matches);
- Assert.assertEquals(matches.size(), 2);
- Assert.assertTrue(containsConstraint(matches, constraint1));
- Assert.assertTrue(containsConstraint(matches, constraint3));
-
- // test messageThrottleStage
- ClusterEvent event = new ClusterEvent("testEvent");
- event.addAttribute("helixmanager", manager);
-
- Pipeline dataRefresh = new Pipeline();
- dataRefresh.addStage(new ReadClusterDataStage());
- runPipeline(event, dataRefresh);
- runStage(event, new ResourceComputationStage());
- MessageSelectionStageOutput msgSelectOutput = new MessageSelectionStageOutput();
-
- Message msg3 =
- createMessage(MessageType.STATE_TRANSITION,
- "msgId-003",
- "OFFLINE",
- "SLAVE",
- "TestDB",
- "localhost_0");
-
- Message msg4 =
- createMessage(MessageType.STATE_TRANSITION,
- "msgId-004",
- "OFFLINE",
- "SLAVE",
- "TestDB",
- "localhost_0");
-
- Message msg5 =
- createMessage(MessageType.STATE_TRANSITION,
- "msgId-005",
- "OFFLINE",
- "SLAVE",
- "TestDB",
- "localhost_0");
-
- Message msg6 =
- createMessage(MessageType.STATE_TRANSITION,
- "msgId-006",
- "OFFLINE",
- "SLAVE",
- "TestDB",
- "localhost_1");
-
- List<Message> selectMessages = new ArrayList<Message>();
- selectMessages.add(msg1);
- selectMessages.add(msg2);
- selectMessages.add(msg3);
- selectMessages.add(msg4);
- selectMessages.add(msg5); // should be throttled
- selectMessages.add(msg6); // should be throttled
-
- msgSelectOutput.addMessages("TestDB", new Partition("TestDB_0"), selectMessages);
- event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), msgSelectOutput);
-
- runStage(event, throttleStage);
-
- MessageThrottleStageOutput msgThrottleOutput =
- event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
- List<Message> throttleMessages =
- msgThrottleOutput.getMessages("TestDB", new Partition("TestDB_0"));
- Assert.assertEquals(throttleMessages.size(), 4);
- Assert.assertTrue(throttleMessages.contains(msg1));
- Assert.assertTrue(throttleMessages.contains(msg2));
- Assert.assertTrue(throttleMessages.contains(msg3));
- Assert.assertTrue(throttleMessages.contains(msg4));
-
- System.out.println("END " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- }
-
- private boolean containsConstraint(Set<ConstraintItem> constraints,
- ConstraintItem constraint)
- {
- for (ConstraintItem item : constraints)
- {
- if (item.toString().equals(constraint.toString()))
- {
- return true;
- }
- }
- return false;
- }
-
- // add pending message test case
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/controller/stages/TestMsgSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/controller/stages/TestMsgSelectionStage.java b/helix-core/src/test/java/com/linkedin/helix/controller/stages/TestMsgSelectionStage.java
deleted file mode 100644
index b66442a..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/controller/stages/TestMsgSelectionStage.java
+++ /dev/null
@@ -1,117 +0,0 @@
-package com.linkedin.helix.controller.stages;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.controller.stages.MessageSelectionStage.Bounds;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.model.Message;
-
-public class TestMsgSelectionStage
-{
- @Test
- public void testMasterXfer()
- {
- System.out.println("START testMasterXfer at " + new Date(System.currentTimeMillis()));
-
- Map<String, LiveInstance> liveInstances = new HashMap<String, LiveInstance>();
- liveInstances.put("localhost_0", new LiveInstance("localhost_0"));
- liveInstances.put("localhost_1", new LiveInstance("localhost_1"));
-
- Map<String, String> currentStates = new HashMap<String, String>();
- currentStates.put("localhost_0", "SLAVE");
- currentStates.put("localhost_1", "MASTER");
-
- Map<String, String> pendingStates = new HashMap<String, String>();
-
- List<Message> messages = new ArrayList<Message>();
- messages.add(TestHelper.createMessage("msgId_0",
- "SLAVE",
- "MASTER",
- "localhost_0",
- "TestDB",
- "TestDB_0"));
- messages.add(TestHelper.createMessage("msgId_1",
- "MASTER",
- "SLAVE",
- "localhost_1",
- "TestDB",
- "TestDB_0"));
-
- Map<String, Bounds> stateConstraints = new HashMap<String, Bounds>();
- stateConstraints.put("MASTER", new Bounds(0, 1));
- stateConstraints.put("SLAVE", new Bounds(0, 2));
-
- Map<String, Integer> stateTransitionPriorities = new HashMap<String, Integer>();
- stateTransitionPriorities.put("MASTER-SLAVE", 0);
- stateTransitionPriorities.put("SLAVE-MASTER", 1);
-
-
- List<Message> selectedMsg =
- new MessageSelectionStage().selectMessages(liveInstances,
- currentStates,
- pendingStates,
- messages,
- stateConstraints,
- stateTransitionPriorities,
- "OFFLINE");
-
- Assert.assertEquals(selectedMsg.size(), 1);
- Assert.assertEquals(selectedMsg.get(0).getMsgId(), "msgId_1");
- System.out.println("END testMasterXfer at " + new Date(System.currentTimeMillis()));
- }
-
- @Test
- public void testMasterXferAfterMasterResume()
- {
- System.out.println("START testMasterXferAfterMasterResume at "
- + new Date(System.currentTimeMillis()));
-
- Map<String, LiveInstance> liveInstances = new HashMap<String, LiveInstance>();
- liveInstances.put("localhost_0", new LiveInstance("localhost_0"));
- liveInstances.put("localhost_1", new LiveInstance("localhost_1"));
-
- Map<String, String> currentStates = new HashMap<String, String>();
- currentStates.put("localhost_0", "SLAVE");
- currentStates.put("localhost_1", "SLAVE");
-
- Map<String, String> pendingStates = new HashMap<String, String>();
- pendingStates.put("localhost_1", "MASTER");
-
- List<Message> messages = new ArrayList<Message>();
- messages.add(TestHelper.createMessage("msgId_0",
- "SLAVE",
- "MASTER",
- "localhost_0",
- "TestDB",
- "TestDB_0"));
-
- Map<String, Bounds> stateConstraints = new HashMap<String, Bounds>();
- stateConstraints.put("MASTER", new Bounds(0, 1));
- stateConstraints.put("SLAVE", new Bounds(0, 2));
-
- Map<String, Integer> stateTransitionPriorities = new HashMap<String, Integer>();
- stateTransitionPriorities.put("MASTER-SLAVE", 0);
- stateTransitionPriorities.put("SLAVE-MASTER", 1);
-
- List<Message> selectedMsg =
- new MessageSelectionStage().selectMessages(liveInstances,
- currentStates,
- pendingStates,
- messages,
- stateConstraints,
- stateTransitionPriorities,
- "OFFLINE");
-
- Assert.assertEquals(selectedMsg.size(), 0);
- System.out.println("END testMasterXferAfterMasterResume at "
- + new Date(System.currentTimeMillis()));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/controller/stages/TestParseInfoFromAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/controller/stages/TestParseInfoFromAlert.java b/helix-core/src/test/java/com/linkedin/helix/controller/stages/TestParseInfoFromAlert.java
deleted file mode 100644
index e1c1c1d..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/controller/stages/TestParseInfoFromAlert.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package com.linkedin.helix.controller.stages;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.integration.ZkStandAloneCMTestBase;
-
-public class TestParseInfoFromAlert extends ZkStandAloneCMTestBase
-{
- @Test
- public void TestParse()
- {
- StatsAggregationStage stage = new StatsAggregationStage();
- String controllerName = CONTROLLER_PREFIX + "_0";
- HelixManager manager = _startCMResultMap.get(controllerName)._manager;
-
- String instanceName = stage.parseInstanceName("localhost_12918.TestStat@DB=123.latency", manager);
- Assert.assertTrue(instanceName.equals("localhost_12918"));
-
- instanceName = stage.parseInstanceName("localhost_12955.TestStat@DB=123.latency", manager);
- Assert.assertTrue(instanceName == null);
-
-
- instanceName = stage.parseInstanceName("localhost_12922.TestStat@DB=123.latency", manager);
- Assert.assertTrue(instanceName.equals("localhost_12922"));
-
-
-
- String resourceName = stage.parseResourceName("localhost_12918.TestStat@DB=TestDB.latency", manager);
- Assert.assertTrue(resourceName.equals("TestDB"));
-
-
- String partitionName = stage.parsePartitionName("localhost_12918.TestStat@DB=TestDB;Partition=TestDB_22.latency", manager);
- Assert.assertTrue(partitionName.equals("TestDB_22"));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/controller/stages/TestRebalancePipeline.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/com/linkedin/helix/controller/stages/TestRebalancePipeline.java
deleted file mode 100644
index 7f6e74e..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/controller/stages/TestRebalancePipeline.java
+++ /dev/null
@@ -1,417 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.Date;
-import java.util.List;
-
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.HelixAdmin;
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.ZkUnitTestBase;
-import com.linkedin.helix.controller.HelixControllerMain;
-import com.linkedin.helix.controller.pipeline.Pipeline;
-import com.linkedin.helix.manager.zk.ZKHelixAdmin;
-import com.linkedin.helix.manager.zk.ZKHelixDataAccessor;
-import com.linkedin.helix.manager.zk.ZkBaseDataAccessor;
-import com.linkedin.helix.model.CurrentState;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Message.Attributes;
-import com.linkedin.helix.model.Partition;
-
-public class TestRebalancePipeline extends ZkUnitTestBase
-{
- private static final Logger LOG =
- Logger.getLogger(TestRebalancePipeline.class.getName());
- final String _className = getShortClassName();
-
- @Test
- public void testDuplicateMsg()
- {
- String clusterName = "CLUSTER_" + _className + "_dup";
- System.out.println("START " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- HelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
-
- HelixManager manager = new DummyClusterManager(clusterName, accessor);
- ClusterEvent event = new ClusterEvent("testEvent");
- event.addAttribute("helixmanager", manager);
-
- final String resourceName = "testResource_dup";
- String[] resourceGroups = new String[] { resourceName };
- // ideal state: node0 is MASTER, node1 is SLAVE
- // replica=2 means 1 master and 1 slave
- setupIdealState(clusterName, new int[] { 0, 1 }, resourceGroups, 1, 2);
- setupLiveInstances(clusterName, new int[] { 0, 1 });
- setupStateModel(clusterName);
-
- // cluster data cache refresh pipeline
- Pipeline dataRefresh = new Pipeline();
- dataRefresh.addStage(new ReadClusterDataStage());
-
- // rebalance pipeline
- Pipeline rebalancePipeline = new Pipeline();
- rebalancePipeline.addStage(new ResourceComputationStage());
- rebalancePipeline.addStage(new CurrentStateComputationStage());
- rebalancePipeline.addStage(new BestPossibleStateCalcStage());
- rebalancePipeline.addStage(new MessageGenerationPhase());
- rebalancePipeline.addStage(new MessageSelectionStage());
- rebalancePipeline.addStage(new MessageThrottleStage());
- rebalancePipeline.addStage(new TaskAssignmentStage());
-
- // round1: set node0 currentState to OFFLINE and node1 currentState to OFFLINE
- setCurrentState(clusterName,
- "localhost_0",
- resourceName,
- resourceName + "_0",
- "session_0",
- "OFFLINE");
- setCurrentState(clusterName,
- "localhost_1",
- resourceName,
- resourceName + "_0",
- "session_1",
- "SLAVE");
-
- runPipeline(event, dataRefresh);
- runPipeline(event, rebalancePipeline);
- MessageSelectionStageOutput msgSelOutput =
- event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
- List<Message> messages =
- msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
- Assert.assertEquals(messages.size(),
- 1,
- "Should output 1 message: OFFLINE-SLAVE for node0");
- Message message = messages.get(0);
- Assert.assertEquals(message.getFromState(), "OFFLINE");
- Assert.assertEquals(message.getToState(), "SLAVE");
- Assert.assertEquals(message.getTgtName(), "localhost_0");
-
- // round2: updates node0 currentState to SLAVE but keep the
- // message, make sure controller should not send S->M until removal is done
- setCurrentState(clusterName,
- "localhost_0",
- resourceName,
- resourceName + "_0",
- "session_1",
- "SLAVE");
-
- runPipeline(event, dataRefresh);
- runPipeline(event, rebalancePipeline);
- msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
- messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
- Assert.assertEquals(messages.size(),
- 0,
- "Should NOT output 1 message: SLAVE-MASTER for node1");
-
- System.out.println("END " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- }
-
- @Test
- public void testMsgTriggeredRebalance() throws Exception
- {
- String clusterName = "CLUSTER_" + _className + "_msgTrigger";
- System.out.println("START " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- HelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
- HelixManager manager = new DummyClusterManager(clusterName, accessor);
- ClusterEvent event = new ClusterEvent("testEvent");
-
- final String resourceName = "testResource_dup";
- String[] resourceGroups = new String[] { resourceName };
-
- TestHelper.setupEmptyCluster(_gZkClient, clusterName);
-
- // ideal state: node0 is MASTER, node1 is SLAVE
- // replica=2 means 1 master and 1 slave
- setupIdealState(clusterName, new int[] { 0, 1 }, resourceGroups, 1, 2);
- setupStateModel(clusterName);
- setupInstances(clusterName, new int[] { 0, 1 });
- setupLiveInstances(clusterName, new int[] { 0, 1 });
-
- TestHelper.startController(clusterName,
- "controller_0",
- ZK_ADDR,
- HelixControllerMain.STANDALONE);
-
- // round1: controller sends O->S to both node0 and node1
- Thread.sleep(1000);
-
- Builder keyBuilder = accessor.keyBuilder();
- List<String> messages = accessor.getChildNames(keyBuilder.messages("localhost_0"));
- Assert.assertEquals(messages.size(), 1);
- messages = accessor.getChildNames(keyBuilder.messages("localhost_1"));
- Assert.assertEquals(messages.size(), 1);
-
- // round2: node0 and node1 update current states but not removing messages
- // controller's rebalance pipeline should be triggered but since messages are not
- // removed
- // no new messages will be sent
- setCurrentState(clusterName,
- "localhost_0",
- resourceName,
- resourceName + "_0",
- "session_0",
- "SLAVE");
- setCurrentState(clusterName,
- "localhost_1",
- resourceName,
- resourceName + "_0",
- "session_1",
- "SLAVE");
- Thread.sleep(1000);
- messages = accessor.getChildNames(keyBuilder.messages("localhost_0"));
- Assert.assertEquals(messages.size(), 1);
-
- messages = accessor.getChildNames(keyBuilder.messages("localhost_1"));
- Assert.assertEquals(messages.size(), 1);
-
- // round3: node0 removes message and controller's rebalance pipeline should be
- // triggered
- // and sends S->M to node0
- messages = accessor.getChildNames(keyBuilder.messages("localhost_0"));
- accessor.removeProperty(keyBuilder.message("localhost_0", messages.get(0)));
- Thread.sleep(1000);
-
- messages = accessor.getChildNames(keyBuilder.messages("localhost_0"));
- Assert.assertEquals(messages.size(), 1);
- ZNRecord msg =
- accessor.getProperty(keyBuilder.message("localhost_0", messages.get(0)))
- .getRecord();
- String toState = msg.getSimpleField(Attributes.TO_STATE.toString());
- Assert.assertEquals(toState, "MASTER");
-
- System.out.println("END " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- }
-
- @Test
- public void testChangeIdealStateWithPendingMsg()
- {
- String clusterName = "CLUSTER_" + _className + "_pending";
- System.out.println("START " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- HelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
- HelixManager manager = new DummyClusterManager(clusterName, accessor);
- ClusterEvent event = new ClusterEvent("testEvent");
- event.addAttribute("helixmanager", manager);
-
- final String resourceName = "testResource_pending";
- String[] resourceGroups = new String[] { resourceName };
- // ideal state: node0 is MASTER, node1 is SLAVE
- // replica=2 means 1 master and 1 slave
- setupIdealState(clusterName, new int[] { 0, 1 }, resourceGroups, 1, 2);
- setupLiveInstances(clusterName, new int[] { 0, 1 });
- setupStateModel(clusterName);
-
- // cluster data cache refresh pipeline
- Pipeline dataRefresh = new Pipeline();
- dataRefresh.addStage(new ReadClusterDataStage());
-
- // rebalance pipeline
- Pipeline rebalancePipeline = new Pipeline();
- rebalancePipeline.addStage(new ResourceComputationStage());
- rebalancePipeline.addStage(new CurrentStateComputationStage());
- rebalancePipeline.addStage(new BestPossibleStateCalcStage());
- rebalancePipeline.addStage(new MessageGenerationPhase());
- rebalancePipeline.addStage(new MessageSelectionStage());
- rebalancePipeline.addStage(new MessageThrottleStage());
- rebalancePipeline.addStage(new TaskAssignmentStage());
-
- // round1: set node0 currentState to OFFLINE and node1 currentState to SLAVE
- setCurrentState(clusterName,
- "localhost_0",
- resourceName,
- resourceName + "_0",
- "session_0",
- "OFFLINE");
- setCurrentState(clusterName,
- "localhost_1",
- resourceName,
- resourceName + "_0",
- "session_1",
- "SLAVE");
-
- runPipeline(event, dataRefresh);
- runPipeline(event, rebalancePipeline);
- MessageSelectionStageOutput msgSelOutput =
- event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
- List<Message> messages =
- msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
- Assert.assertEquals(messages.size(),
- 1,
- "Should output 1 message: OFFLINE-SLAVE for node0");
- Message message = messages.get(0);
- Assert.assertEquals(message.getFromState(), "OFFLINE");
- Assert.assertEquals(message.getToState(), "SLAVE");
- Assert.assertEquals(message.getTgtName(), "localhost_0");
-
- // round2: drop resource, but keep the
- // message, make sure controller should not send O->DROPPEDN until O->S is done
- HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
- admin.dropResource(clusterName, resourceName);
-
- runPipeline(event, dataRefresh);
- runPipeline(event, rebalancePipeline);
- msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
- messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
- Assert.assertEquals(messages.size(),
- 1,
- "Should output only 1 message: OFFLINE->DROPPED for localhost_1");
-
- message = messages.get(0);
- Assert.assertEquals(message.getFromState(), "SLAVE");
- Assert.assertEquals(message.getToState(), "OFFLINE");
- Assert.assertEquals(message.getTgtName(), "localhost_1");
-
- // round3: remove O->S for localhost_0, controller should now send O->DROPPED to
- // localhost_0
- Builder keyBuilder = accessor.keyBuilder();
- List<String> msgIds = accessor.getChildNames(keyBuilder.messages("localhost_0"));
- accessor.removeProperty(keyBuilder.message("localhost_0", msgIds.get(0)));
- runPipeline(event, dataRefresh);
- runPipeline(event, rebalancePipeline);
- msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
- messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
- Assert.assertEquals(messages.size(),
- 1,
- "Should output 1 message: OFFLINE->DROPPED for localhost_0");
- message = messages.get(0);
- Assert.assertEquals(message.getFromState(), "OFFLINE");
- Assert.assertEquals(message.getToState(), "DROPPED");
- Assert.assertEquals(message.getTgtName(), "localhost_0");
-
- System.out.println("END " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- }
-
- @Test
- public void testMasterXfer()
- {
- String clusterName = "CLUSTER_" + _className + "_xfer";
-
- System.out.println("START " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- HelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
- HelixManager manager = new DummyClusterManager(clusterName, accessor);
- ClusterEvent event = new ClusterEvent("testEvent");
- event.addAttribute("helixmanager", manager);
-
- final String resourceName = "testResource_xfer";
- String[] resourceGroups = new String[] { resourceName };
- // ideal state: node0 is MASTER, node1 is SLAVE
- // replica=2 means 1 master and 1 slave
- setupIdealState(clusterName, new int[] { 0, 1 }, resourceGroups, 1, 2);
- setupLiveInstances(clusterName, new int[] { 1 });
- setupStateModel(clusterName);
-
- // cluster data cache refresh pipeline
- Pipeline dataRefresh = new Pipeline();
- dataRefresh.addStage(new ReadClusterDataStage());
-
- // rebalance pipeline
- Pipeline rebalancePipeline = new Pipeline();
- rebalancePipeline.addStage(new ResourceComputationStage());
- rebalancePipeline.addStage(new CurrentStateComputationStage());
- rebalancePipeline.addStage(new BestPossibleStateCalcStage());
- rebalancePipeline.addStage(new MessageGenerationPhase());
- rebalancePipeline.addStage(new MessageSelectionStage());
- rebalancePipeline.addStage(new MessageThrottleStage());
- rebalancePipeline.addStage(new TaskAssignmentStage());
-
- // round1: set node1 currentState to SLAVE
- setCurrentState(clusterName,
- "localhost_1",
- resourceName,
- resourceName + "_0",
- "session_1",
- "SLAVE");
-
- runPipeline(event, dataRefresh);
- runPipeline(event, rebalancePipeline);
- MessageSelectionStageOutput msgSelOutput =
- event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
- List<Message> messages =
- msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
- Assert.assertEquals(messages.size(),
- 1,
- "Should output 1 message: SLAVE-MASTER for node1");
- Message message = messages.get(0);
- Assert.assertEquals(message.getFromState(), "SLAVE");
- Assert.assertEquals(message.getToState(), "MASTER");
- Assert.assertEquals(message.getTgtName(), "localhost_1");
-
- // round2: updates node0 currentState to SLAVE but keep the
- // message, make sure controller should not send S->M until removal is done
- setupLiveInstances(clusterName, new int[] { 0 });
- setCurrentState(clusterName,
- "localhost_0",
- resourceName,
- resourceName + "_0",
- "session_0",
- "SLAVE");
-
- runPipeline(event, dataRefresh);
- runPipeline(event, rebalancePipeline);
- msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
- messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
- Assert.assertEquals(messages.size(),
- 0,
- "Should NOT output 1 message: SLAVE-MASTER for node0");
-
- System.out.println("END " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- }
-
- protected void setCurrentState(String clusterName,
- String instance,
- String resourceGroupName,
- String resourceKey,
- String sessionId,
- String state)
- {
- ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
- Builder keyBuilder = accessor.keyBuilder();
-
- CurrentState curState = new CurrentState(resourceGroupName);
- curState.setState(resourceKey, state);
- curState.setSessionId(sessionId);
- curState.setStateModelDefRef("MasterSlave");
- accessor.setProperty(keyBuilder.currentState(instance, sessionId, resourceGroupName),
- curState);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/controller/stages/TestResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/controller/stages/TestResourceComputationStage.java b/helix-core/src/test/java/com/linkedin/helix/controller/stages/TestResourceComputationStage.java
deleted file mode 100644
index d8a1b4b..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/controller/stages/TestResourceComputationStage.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.testng.AssertJUnit;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.controller.pipeline.StageContext;
-import com.linkedin.helix.model.CurrentState;
-import com.linkedin.helix.model.IdealState;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.model.Resource;
-import com.linkedin.helix.tools.IdealStateCalculatorForStorageNode;
-
-public class TestResourceComputationStage extends BaseStageTest
-{
- /**
- * Case where we have one resource in IdealState
- *
- * @throws Exception
- */
- @Test
- public void testSimple() throws Exception
- {
- int nodes = 5;
- List<String> instances = new ArrayList<String>();
- for (int i = 0; i < nodes; i++)
- {
- instances.add("localhost_" + i);
- }
- int partitions = 10;
- int replicas = 1;
- String resourceName = "testResource";
- ZNRecord record = IdealStateCalculatorForStorageNode.calculateIdealState(
- instances, partitions, replicas, resourceName, "MASTER", "SLAVE");
- IdealState idealState = new IdealState(record);
- idealState.setStateModelDefRef("MasterSlave");
-
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- Builder keyBuilder = accessor.keyBuilder();
- accessor.setProperty(keyBuilder.idealStates(resourceName),
- idealState);
- ResourceComputationStage stage = new ResourceComputationStage();
- runStage(event, new ReadClusterDataStage());
- runStage(event, stage);
-
- Map<String, Resource> resource = event
- .getAttribute(AttributeName.RESOURCES.toString());
- AssertJUnit.assertEquals(1, resource.size());
-
- AssertJUnit.assertEquals(resource.keySet().iterator().next(),
- resourceName);
- AssertJUnit.assertEquals(resource.values().iterator().next()
- .getResourceName(), resourceName);
- AssertJUnit.assertEquals(resource.values().iterator().next()
- .getStateModelDefRef(), idealState.getStateModelDefRef());
- AssertJUnit.assertEquals(resource.values().iterator().next()
- .getPartitions().size(), partitions);
- }
-
- @Test
- public void testMultipleResources() throws Exception
- {
-// List<IdealState> idealStates = new ArrayList<IdealState>();
- String[] resources = new String[]
- { "testResource1", "testResource2" };
- List<IdealState> idealStates = setupIdealState(5, resources, 10, 1);
- ResourceComputationStage stage = new ResourceComputationStage();
- runStage(event, new ReadClusterDataStage());
- runStage(event, stage);
-
- Map<String, Resource> resourceMap = event
- .getAttribute(AttributeName.RESOURCES.toString());
- AssertJUnit.assertEquals(resources.length, resourceMap.size());
-
- for (int i = 0; i < resources.length; i++)
- {
- String resourceName = resources[i];
- IdealState idealState = idealStates.get(i);
- AssertJUnit.assertTrue(resourceMap.containsKey(resourceName));
- AssertJUnit.assertEquals(resourceMap.get(resourceName)
- .getResourceName(), resourceName);
- AssertJUnit.assertEquals(resourceMap.get(resourceName)
- .getStateModelDefRef(), idealState.getStateModelDefRef());
- AssertJUnit.assertEquals(resourceMap.get(resourceName)
- .getPartitions().size(), idealState.getNumPartitions());
- }
- }
-
- @Test
- public void testMultipleResourcesWithSomeDropped() throws Exception
- {
- int nodes = 5;
- List<String> instances = new ArrayList<String>();
- for (int i = 0; i < nodes; i++)
- {
- instances.add("localhost_" + i);
- }
- String[] resources = new String[]
- { "testResource1", "testResource2" };
- List<IdealState> idealStates = new ArrayList<IdealState>();
- for (int i = 0; i < resources.length; i++)
- {
- int partitions = 10;
- int replicas = 1;
- String resourceName = resources[i];
- ZNRecord record = IdealStateCalculatorForStorageNode
- .calculateIdealState(instances, partitions, replicas,
- resourceName, "MASTER", "SLAVE");
- IdealState idealState = new IdealState(record);
- idealState.setStateModelDefRef("MasterSlave");
-
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- Builder keyBuilder = accessor.keyBuilder();
- accessor.setProperty(keyBuilder.idealStates(resourceName),
- idealState);
-
-
- idealStates.add(idealState);
- }
- // ADD A LIVE INSTANCE WITH A CURRENT STATE THAT CONTAINS RESOURCE WHICH NO
- // LONGER EXISTS IN IDEALSTATE
- String instanceName = "localhost_" + 3;
- LiveInstance liveInstance = new LiveInstance(instanceName);
- String sessionId = UUID.randomUUID().toString();
- liveInstance.setSessionId(sessionId);
-
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- Builder keyBuilder = accessor.keyBuilder();
- accessor.setProperty(keyBuilder.liveInstance(instanceName),
- liveInstance);
-
- String oldResource = "testResourceOld";
- CurrentState currentState = new CurrentState(oldResource);
- currentState.setState("testResourceOld_0", "OFFLINE");
- currentState.setState("testResourceOld_1", "SLAVE");
- currentState.setState("testResourceOld_2", "MASTER");
- currentState.setStateModelDefRef("MasterSlave");
- accessor.setProperty(keyBuilder.currentState(instanceName, sessionId, oldResource),
- currentState);
-
- ResourceComputationStage stage = new ResourceComputationStage();
- runStage(event, new ReadClusterDataStage());
- runStage(event, stage);
-
- Map<String, Resource> resourceMap = event
- .getAttribute(AttributeName.RESOURCES.toString());
- // +1 because it will have one for current state
- AssertJUnit.assertEquals(resources.length + 1, resourceMap.size());
-
- for (int i = 0; i < resources.length; i++)
- {
- String resourceName = resources[i];
- IdealState idealState = idealStates.get(i);
- AssertJUnit.assertTrue(resourceMap.containsKey(resourceName));
- AssertJUnit.assertEquals(resourceMap.get(resourceName)
- .getResourceName(), resourceName);
- AssertJUnit.assertEquals(resourceMap.get(resourceName)
- .getStateModelDefRef(), idealState.getStateModelDefRef());
- AssertJUnit.assertEquals(resourceMap.get(resourceName)
- .getPartitions().size(), idealState.getNumPartitions());
- }
- // Test the data derived from CurrentState
- AssertJUnit.assertTrue(resourceMap.containsKey(oldResource));
- AssertJUnit.assertEquals(resourceMap.get(oldResource)
- .getResourceName(), oldResource);
- AssertJUnit.assertEquals(resourceMap.get(oldResource)
- .getStateModelDefRef(), currentState.getStateModelDefRef());
- AssertJUnit
- .assertEquals(resourceMap.get(oldResource).getPartitions()
- .size(), currentState.getPartitionStateMap().size());
- AssertJUnit.assertNotNull(resourceMap.get(oldResource).getPartition("testResourceOld_0"));
- AssertJUnit.assertNotNull(resourceMap.get(oldResource).getPartition("testResourceOld_1"));
- AssertJUnit.assertNotNull(resourceMap.get(oldResource).getPartition("testResourceOld_2"));
-
- }
-
- @Test
- public void testNull()
- {
- ClusterEvent event = new ClusterEvent("sampleEvent");
- ResourceComputationStage stage = new ResourceComputationStage();
- StageContext context = new StageContext();
- stage.init(context);
- stage.preProcess();
- boolean exceptionCaught = false;
- try
- {
- stage.process(event);
- } catch (Exception e)
- {
- exceptionCaught = true;
- }
- AssertJUnit.assertTrue(exceptionCaught);
- stage.postProcess();
- }
-
-
-// public void testEmptyCluster()
-// {
-// ClusterEvent event = new ClusterEvent("sampleEvent");
-// ClusterManager manager = new Mocks.MockManager();
-// event.addAttribute("clustermanager", manager);
-// ResourceComputationStage stage = new ResourceComputationStage();
-// StageContext context = new StageContext();
-// stage.init(context);
-// stage.preProcess();
-// boolean exceptionCaught = false;
-// try
-// {
-// stage.process(event);
-// } catch (Exception e)
-// {
-// exceptionCaught = true;
-// }
-// Assert.assertTrue(exceptionCaught);
-// stage.postProcess();
-// }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/healthcheck/TestAddDropAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/healthcheck/TestAddDropAlert.java b/helix-core/src/test/java/com/linkedin/helix/healthcheck/TestAddDropAlert.java
deleted file mode 100644
index 7411230..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/healthcheck/TestAddDropAlert.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.healthcheck;
-
-import java.util.Date;
-import java.util.Map;
-import java.util.Set;
-
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.TestHelper.StartCMResult;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.controller.HelixControllerMain;
-import com.linkedin.helix.healthcheck.HealthStatsAggregationTask;
-import com.linkedin.helix.healthcheck.ParticipantHealthReportCollectorImpl;
-import com.linkedin.helix.integration.ZkIntegrationTestBase;
-import com.linkedin.helix.manager.zk.ZKHelixDataAccessor;
-import com.linkedin.helix.manager.zk.ZNRecordSerializer;
-import com.linkedin.helix.manager.zk.ZkBaseDataAccessor;
-import com.linkedin.helix.manager.zk.ZkClient;
-import com.linkedin.helix.mock.storage.MockEspressoHealthReportProvider;
-import com.linkedin.helix.mock.storage.MockParticipant;
-import com.linkedin.helix.mock.storage.MockTransition;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.tools.ClusterSetup;
-import com.linkedin.helix.tools.ClusterStateVerifier;
-
-public class TestAddDropAlert extends ZkIntegrationTestBase
-{
- ZkClient _zkClient;
- protected ClusterSetup _setupTool = null;
- protected final String _alertStr =
- "EXP(accumulate()(localhost_12918.RestQueryStats@DBName=TestDB0.latency))CMP(GREATER)CON(10)";
- protected final String _alertStatusStr = _alertStr; // +" : (*)";
- protected final String _dbName = "TestDB0";
-
- @BeforeClass()
- public void beforeClass() throws Exception
- {
- _zkClient = new ZkClient(ZK_ADDR);
- _zkClient.setZkSerializer(new ZNRecordSerializer());
-
- _setupTool = new ClusterSetup(ZK_ADDR);
- }
-
- @AfterClass
- public void afterClass()
- {
- _zkClient.close();
- }
-
- public class AddDropAlertTransition extends MockTransition
- {
- @Override
- public void doTransition(Message message, NotificationContext context)
- {
- HelixManager manager = context.getManager();
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- String fromState = message.getFromState();
- String toState = message.getToState();
- String instance = message.getTgtName();
- String partition = message.getPartitionName();
-
- if (fromState.equalsIgnoreCase("SLAVE") && toState.equalsIgnoreCase("MASTER"))
- {
-
- // add a stat and report to ZK
- // perhaps should keep reporter per instance...
- ParticipantHealthReportCollectorImpl reporter =
- new ParticipantHealthReportCollectorImpl(manager, instance);
- MockEspressoHealthReportProvider provider =
- new MockEspressoHealthReportProvider();
- reporter.addHealthReportProvider(provider);
- String statName = "latency";
- provider.setStat(_dbName, statName, "15");
- reporter.transmitHealthReports();
-
- // sleep long enough for first set of alerts to report and alert to get deleted
- // then change reported data
- try
- {
- Thread.sleep(10000);
- }
- catch (InterruptedException e)
- {
- System.err.println("Error sleeping");
- }
- provider.setStat(_dbName, statName, "1");
- reporter.transmitHealthReports();
-
- /*
- * for (int i = 0; i < 5; i++) { accessor.setProperty(PropertyType.HEALTHREPORT,
- * new ZNRecord("mockAlerts" + i), instance, "mockAlerts"); try {
- * Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated
- * catch block e.printStackTrace(); } }
- */
- }
- }
- }
-
- @Test()
- public void testAddDropAlert() throws Exception
- {
- String clusterName = getShortClassName();
- MockParticipant[] participants = new MockParticipant[5];
-
- System.out.println("START TestAddDropAlert at "
- + new Date(System.currentTimeMillis()));
-
- TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant start port
- "localhost", // participant name prefix
- "TestDB", // resource name prefix
- 1, // resources
- 10, // partitions per resource group
- 5, // number of nodes //change back to 5!!!
- 1, // replicas //change back to 3!!!
- "MasterSlave",
- true); // do rebalance
- // enableHealthCheck(clusterName);
-
- _setupTool.getClusterManagementTool().addAlert(clusterName, _alertStr);
-
- StartCMResult cmResult =
- TestHelper.startController(clusterName,
- "controller_0",
- ZK_ADDR,
- HelixControllerMain.STANDALONE);
- // start participants
- for (int i = 0; i < 5; i++) // !!!change back to 5
- {
- String instanceName = "localhost_" + (12918 + i);
-
- participants[i] =
- new MockParticipant(clusterName,
- instanceName,
- ZK_ADDR,
- new AddDropAlertTransition());
- participants[i].syncStart();
-// new Thread(participants[i]).start();
- }
-
- boolean result =
- ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
- clusterName));
- Assert.assertTrue(result);
-
- // drop alert soon after adding, but leave enough time for alert to fire once
- // Thread.sleep(3000);
- ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
-
- new HealthStatsAggregationTask(cmResult._manager).run();
- String instance = "localhost_12918";
- ZNRecord record = accessor.getProperty(keyBuilder.alertStatus()).getRecord();
- Map<String, Map<String, String>> recMap = record.getMapFields();
- Set<String> keySet = recMap.keySet();
- Assert.assertTrue(keySet.size() > 0);
-
- _setupTool.getClusterManagementTool().dropAlert(clusterName, _alertStr);
- new HealthStatsAggregationTask(cmResult._manager).run();
- // other verifications go here
- // for (int i = 0; i < 1; i++) //change 1 back to 5
- // {
- // String instance = "localhost_" + (12918 + i);
- record = accessor.getProperty(keyBuilder.alertStatus()).getRecord();
- recMap = record.getMapFields();
- keySet = recMap.keySet();
- Assert.assertEquals(keySet.size(), 0);
- // }
-
- System.out.println("END TestAddDropAlert at " + new Date(System.currentTimeMillis()));
- }
-}