You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by sl...@apache.org on 2014/01/10 20:12:07 UTC
git commit: [HELIX-350] cluster status monitor should not be reset in
FINALIZE type pipeline, rb=16772
Updated Branches:
refs/heads/master 3b59bc39c -> 0e72af69c
[HELIX-350] cluster status monitor should not be reset in FINALIZE type pipeline,rb=16772
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/0e72af69
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/0e72af69
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/0e72af69
Branch: refs/heads/master
Commit: 0e72af69cf03beba75383676e079b54799c51d55
Parents: 3b59bc3
Author: slu2011 <lu...@gmail.com>
Authored: Thu Jan 9 15:03:48 2014 -0800
Committer: slu2011 <lu...@gmail.com>
Committed: Thu Jan 9 15:03:48 2014 -0800
----------------------------------------------------------------------
.../controller/GenericHelixController.java | 14 +-
.../stages/ExternalViewComputeStage.java | 24 +-
.../DistClusterControllerElection.java | 8 +-
.../TestClusterStatusMonitorLifecycle.java | 229 +++++++++++++++++++
4 files changed, 253 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/0e72af69/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 9fef2da..b15627a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -250,11 +250,6 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
// Initialize _clusterStatusMonitor
if (context != null) {
if (context.getType() == Type.FINALIZE) {
- if (_clusterStatusMonitor != null) {
- _clusterStatusMonitor.reset();
- _clusterStatusMonitor = null;
- }
-
stopRebalancingTimer();
logger.info("Get FINALIZE notification, skip the pipeline. Event :" + event.getName());
return;
@@ -533,5 +528,12 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
_lastSeenSessions.set(curSessions);
}
-
+
+ public void shutdownClusterStatusMonitor(String clusterName) {
+ if (_clusterStatusMonitor != null) {
+ logger.info("Shut down _clusterStatusMonitor for cluster " + clusterName);
+ _clusterStatusMonitor.reset();
+ _clusterStatusMonitor = null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/0e72af69/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index 977b661..dddb0c0 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -35,6 +35,7 @@ import org.apache.helix.ZNRecord;
import org.apache.helix.ZNRecordDelta;
import org.apache.helix.ZNRecordDelta.MergeOperation;
import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Resource;
import org.apache.helix.api.State;
import org.apache.helix.api.config.ResourceConfig;
import org.apache.helix.api.config.SchedulerTaskConfig;
@@ -45,11 +46,13 @@ import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.StatusUpdate;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.log4j.Logger;
public class ExternalViewComputeStage extends AbstractBaseStage {
@@ -113,19 +116,16 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
}
}
- // TODO fix this
// Update cluster status monitor mbean
- // ClusterStatusMonitor clusterStatusMonitor =
- // (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
- // IdealState idealState = cache._idealStateMap.get(view.getResourceName());
- // if (idealState != null) {
- // if (clusterStatusMonitor != null
- // && !idealState.getStateModelDefRef().equalsIgnoreCase(
- // DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
- // clusterStatusMonitor.onExternalViewChange(view,
- // cache._idealStateMap.get(view.getResourceName()));
- // }
- // }
+ ClusterStatusMonitor clusterStatusMonitor = (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
+ Resource currentResource = cluster.getResourceMap().get(view.getResourceId());
+ if (currentResource != null) {
+ IdealState idealState = currentResource.getIdealState();
+ if (clusterStatusMonitor != null &&
+ !idealState.getStateModelDefRef().equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+ clusterStatusMonitor.onExternalViewChange(view, idealState);
+ }
+ }
// compare the new external view with current one, set only on different
ExternalView curExtView = curExtViews.get(resourceId.stringify());
http://git-wip-us.apache.org/repos/asf/helix/blob/0e72af69/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java
index 0e8c6fd..45bee64 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java
@@ -95,13 +95,13 @@ public class DistClusterControllerElection implements ControllerChangeListener {
}
}
- } else if (changeContext.getType().equals(NotificationContext.Type.FINALIZE)) {
-
- if (_leader != null) {
+ }
+ else if (changeContext.getType().equals(NotificationContext.Type.FINALIZE)) {
+ if(_leader != null) {
_leader.disconnect();
}
+ _controller.shutdownClusterStatusMonitor(manager.getClusterName());
}
-
} catch (Exception e) {
LOG.error("Exception when trying to become leader", e);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/0e72af69/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
new file mode 100644
index 0000000..373b024
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
@@ -0,0 +1,229 @@
+package org.apache.helix.monitoring;
+
+import java.io.IOException;
+import java.util.Date;
+
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanServerConnection;
+import javax.management.MBeanServerNotification;
+import javax.management.MalformedObjectNameException;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.TestDistributedCMMain;
+import org.apache.helix.integration.manager.ClusterDistributedController;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.monitoring.mbeans.ClusterMBeanObserver;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestClusterStatusMonitorLifecycle extends TestDistributedCMMain{
+
+ MockParticipantManager[] _participants;
+ ClusterDistributedController[] _controllers;
+ String _controllerClusterName;
+ String _clusterNamePrefix;
+ String _firstClusterName;
+
+ final int n = 5;
+ final int clusterNb = 10;
+
+ @BeforeClass
+ public void testDistributedCMMain() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ _clusterNamePrefix = className + "_" + methodName;
+
+ System.out
+ .println("START " + _clusterNamePrefix + " at " + new Date(System.currentTimeMillis()));
+
+ // setup 10 clusters
+ for (int i = 0; i < clusterNb; i++) {
+ String clusterName = _clusterNamePrefix + "0_" + i;
+ String participantName = "localhost" + i;
+ String resourceName = "TestDB" + i;
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ participantName, // participant name prefix
+ resourceName, // resource name prefix
+ 1, // resources
+ 8, // partitions per resource
+ n, // number of nodes
+ 3, // replicas
+ "MasterSlave", true); // do rebalance
+ }
+
+ // setup controller cluster
+ _controllerClusterName = "CONTROLLER_" + _clusterNamePrefix;
+ TestHelper.setupCluster("CONTROLLER_" + _clusterNamePrefix, ZK_ADDR, 0, // controller
+ // port
+ "controller", // participant name prefix
+ _clusterNamePrefix, // resource name prefix
+ 1, // resources
+ clusterNb, // partitions per resource
+ n, // number of nodes
+ 3, // replicas
+ "LeaderStandby", true); // do rebalance
+
+ // start distributed cluster controllers
+ _controllers = new ClusterDistributedController[n + n];
+ for (int i = 0; i < n; i++) {
+ _controllers[i] =
+ new ClusterDistributedController(ZK_ADDR, _controllerClusterName, "controller_" + i);
+ _controllers[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(
+ new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, _controllerClusterName),
+ 30000);
+ Assert.assertTrue(result, "Controller cluster NOT in ideal state");
+
+ // start first cluster
+ _participants = new MockParticipantManager[n];
+ _firstClusterName = _clusterNamePrefix + "0_0";
+ for (int i = 0; i < n; i++) {
+ String instanceName = "localhost0_" + (12918 + i);
+ _participants[i] = new MockParticipantManager(ZK_ADDR, _firstClusterName, instanceName);
+ _participants[i].syncStart();
+ }
+
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ _firstClusterName));
+ Assert.assertTrue(result, "first cluster NOT in ideal state");
+
+ // add more controllers to controller cluster
+ ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+ for (int i = 0; i < n; i++) {
+ String controller = "controller_" + (n + i);
+ setupTool.addInstanceToCluster(_controllerClusterName, controller);
+ }
+ setupTool.rebalanceStorageCluster(_controllerClusterName, _clusterNamePrefix + "0", 6);
+ for (int i = n; i < 2 * n; i++) {
+ _controllers[i] =
+ new ClusterDistributedController(ZK_ADDR, _controllerClusterName, "controller_" + i);
+ _controllers[i].syncStart();
+ }
+
+ // verify controller cluster
+ result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ _controllerClusterName));
+ Assert.assertTrue(result, "Controller cluster NOT in ideal state");
+
+ // verify first cluster
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ _firstClusterName));
+ Assert.assertTrue(result, "first cluster NOT in ideal state");
+ }
+
+ @AfterClass
+ public void afterClass(){
+ System.out.println("Cleaning up...");
+ for (int i = 0; i < 5; i++) {
+ boolean result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ _controllerClusterName));
+ _controllers[i].syncStop();
+ }
+
+ for (int i = 0; i < 5; i++) {
+ _participants[i].syncStop();
+ }
+
+ System.out.println("END " + _clusterNamePrefix + " at " + new Date(System.currentTimeMillis()));
+
+ }
+
+ class ParticipantMonitorListener extends ClusterMBeanObserver {
+
+ int _nMbeansUnregistered = 0;
+ int _nMbeansRegistered = 0;
+ public ParticipantMonitorListener(String domain)
+ throws InstanceNotFoundException, IOException,
+ MalformedObjectNameException, NullPointerException {
+ super(domain);
+ }
+
+ @Override
+ public void onMBeanRegistered(MBeanServerConnection server,
+ MBeanServerNotification mbsNotification) {
+ _nMbeansRegistered ++;
+ }
+
+ @Override
+ public void onMBeanUnRegistered(MBeanServerConnection server,
+ MBeanServerNotification mbsNotification) {
+ _nMbeansUnregistered++;
+ }}
+
+ @Test
+ public void testClusterStatusMonitorLifecycle() throws InstanceNotFoundException, MalformedObjectNameException, NullPointerException, IOException, InterruptedException{
+ ParticipantMonitorListener listener = new ParticipantMonitorListener("ClusterStatus");
+
+ int nMbeansUnregistered = listener._nMbeansUnregistered;
+ int nMbeansRegistered = listener._nMbeansRegistered;
+
+ _participants[0].disconnect();
+
+ // participant goes away. should be no change
+ Thread.sleep(1000);
+ Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered);
+ Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered);
+
+ HelixDataAccessor accessor = _participants[n-1].getHelixDataAccessor();
+ String firstControllerName = accessor.getProperty(accessor.keyBuilder().controllerLeader()).getId();
+
+ ClusterDistributedController firstController = null;
+ for(ClusterDistributedController controller : _controllers)
+ {
+ if(controller.getInstanceName().equals(firstControllerName))
+ {
+ firstController = controller;
+ }
+ }
+ firstController.disconnect();
+ Thread.sleep(1000);
+
+ // 1 cluster status monitor and 1 resource monitor
+ Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 2);
+ Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 2);
+
+ String instanceName = "localhost0_" + (12918 + 0);
+ _participants[0] = new MockParticipantManager(ZK_ADDR, _firstClusterName, instanceName);
+ _participants[0].syncStart();
+
+ // participant goes back. should be no change
+ Thread.sleep(1000);
+ Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 2);
+ Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 2);
+
+ // Add a resource, one more mbean registered
+ ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+ IdealState idealState = accessor.getProperty(accessor.keyBuilder().idealStates("TestDB00"));
+
+ setupTool.addResourceToCluster(_firstClusterName, "TestDB1", idealState.getNumPartitions(), "MasterSlave");
+ setupTool.rebalanceResource(_firstClusterName, "TestDB1", Integer.parseInt(idealState.getReplicas()));
+
+ Thread.sleep(1000);
+ Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 2);
+ Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 3);
+
+ // remove resource, no change
+ setupTool.dropResourceFromCluster(_firstClusterName, "TestDB1");
+ Thread.sleep(1000);
+ Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 2);
+ Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 3);
+
+
+ }
+}