You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by sl...@apache.org on 2014/01/10 20:12:07 UTC

git commit: [HELIX-350] cluster status monitor should not be reset in FINALIZE type pipeline, rb=16772

Updated Branches:
  refs/heads/master 3b59bc39c -> 0e72af69c


[HELIX-350] cluster status monitor should not be reset in FINALIZE type pipeline,rb=16772


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/0e72af69
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/0e72af69
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/0e72af69

Branch: refs/heads/master
Commit: 0e72af69cf03beba75383676e079b54799c51d55
Parents: 3b59bc3
Author: slu2011 <lu...@gmail.com>
Authored: Thu Jan 9 15:03:48 2014 -0800
Committer: slu2011 <lu...@gmail.com>
Committed: Thu Jan 9 15:03:48 2014 -0800

----------------------------------------------------------------------
 .../controller/GenericHelixController.java      |  14 +-
 .../stages/ExternalViewComputeStage.java        |  24 +-
 .../DistClusterControllerElection.java          |   8 +-
 .../TestClusterStatusMonitorLifecycle.java      | 229 +++++++++++++++++++
 4 files changed, 253 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/0e72af69/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 9fef2da..b15627a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -250,11 +250,6 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
     // Initialize _clusterStatusMonitor
     if (context != null) {
       if (context.getType() == Type.FINALIZE) {
-        if (_clusterStatusMonitor != null) {
-          _clusterStatusMonitor.reset();
-          _clusterStatusMonitor = null;
-        }
-
         stopRebalancingTimer();
         logger.info("Get FINALIZE notification, skip the pipeline. Event :" + event.getName());
         return;
@@ -533,5 +528,12 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
     _lastSeenSessions.set(curSessions);
 
   }
-
+  
+  public void shutdownClusterStatusMonitor(String clusterName) {
+    if (_clusterStatusMonitor != null) {
+      logger.info("Shut down _clusterStatusMonitor for cluster " + clusterName);
+       _clusterStatusMonitor.reset();
+       _clusterStatusMonitor = null;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/0e72af69/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index 977b661..dddb0c0 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -35,6 +35,7 @@ import org.apache.helix.ZNRecord;
 import org.apache.helix.ZNRecordDelta;
 import org.apache.helix.ZNRecordDelta.MergeOperation;
 import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Resource;
 import org.apache.helix.api.State;
 import org.apache.helix.api.config.ResourceConfig;
 import org.apache.helix.api.config.SchedulerTaskConfig;
@@ -45,11 +46,13 @@ import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.model.StatusUpdate;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.log4j.Logger;
 
 public class ExternalViewComputeStage extends AbstractBaseStage {
@@ -113,19 +116,16 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
         }
       }
 
-      // TODO fix this
       // Update cluster status monitor mbean
-      // ClusterStatusMonitor clusterStatusMonitor =
-      // (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
-      // IdealState idealState = cache._idealStateMap.get(view.getResourceName());
-      // if (idealState != null) {
-      // if (clusterStatusMonitor != null
-      // && !idealState.getStateModelDefRef().equalsIgnoreCase(
-      // DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
-      // clusterStatusMonitor.onExternalViewChange(view,
-      // cache._idealStateMap.get(view.getResourceName()));
-      // }
-      // }
+       ClusterStatusMonitor clusterStatusMonitor = (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
+       Resource currentResource = cluster.getResourceMap().get(view.getResourceId());
+       if (currentResource != null) {
+         IdealState idealState = currentResource.getIdealState();
+         if (clusterStatusMonitor != null && 
+             !idealState.getStateModelDefRef().equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+         clusterStatusMonitor.onExternalViewChange(view, idealState);
+         }
+       }
 
       // compare the new external view with current one, set only on different
       ExternalView curExtView = curExtViews.get(resourceId.stringify());

http://git-wip-us.apache.org/repos/asf/helix/blob/0e72af69/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java
index 0e8c6fd..45bee64 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java
@@ -95,13 +95,13 @@ public class DistClusterControllerElection implements ControllerChangeListener {
 
           }
         }
-      } else if (changeContext.getType().equals(NotificationContext.Type.FINALIZE)) {
-
-        if (_leader != null) {
+      } 
+      else if (changeContext.getType().equals(NotificationContext.Type.FINALIZE)) {
+        if(_leader != null) {
           _leader.disconnect();
         }
+        _controller.shutdownClusterStatusMonitor(manager.getClusterName());
       }
-
     } catch (Exception e) {
       LOG.error("Exception when trying to become leader", e);
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/0e72af69/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
new file mode 100644
index 0000000..373b024
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
@@ -0,0 +1,229 @@
+package org.apache.helix.monitoring;
+
+import java.io.IOException;
+import java.util.Date;
+
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanServerConnection;
+import javax.management.MBeanServerNotification;
+import javax.management.MalformedObjectNameException;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.TestDistributedCMMain;
+import org.apache.helix.integration.manager.ClusterDistributedController;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.monitoring.mbeans.ClusterMBeanObserver;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestClusterStatusMonitorLifecycle extends TestDistributedCMMain{
+
+  MockParticipantManager[] _participants;
+  ClusterDistributedController[] _controllers;
+  String _controllerClusterName;
+  String _clusterNamePrefix;
+  String _firstClusterName;
+  
+  final int n = 5;
+  final int clusterNb = 10;
+    
+  @BeforeClass
+  public void testDistributedCMMain() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    _clusterNamePrefix = className + "_" + methodName;
+
+    System.out
+        .println("START " + _clusterNamePrefix + " at " + new Date(System.currentTimeMillis()));
+
+    // setup 10 clusters
+    for (int i = 0; i < clusterNb; i++) {
+      String clusterName = _clusterNamePrefix + "0_" + i;
+      String participantName = "localhost" + i;
+      String resourceName = "TestDB" + i;
+      TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+          participantName, // participant name prefix
+          resourceName, // resource name prefix
+          1, // resources
+          8, // partitions per resource
+          n, // number of nodes
+          3, // replicas
+          "MasterSlave", true); // do rebalance
+    }
+
+    // setup controller cluster
+    _controllerClusterName = "CONTROLLER_" + _clusterNamePrefix;
+    TestHelper.setupCluster("CONTROLLER_" + _clusterNamePrefix, ZK_ADDR, 0, // controller
+                                                                           // port
+        "controller", // participant name prefix
+        _clusterNamePrefix, // resource name prefix
+        1, // resources
+        clusterNb, // partitions per resource
+        n, // number of nodes
+        3, // replicas
+        "LeaderStandby", true); // do rebalance
+
+    // start distributed cluster controllers
+    _controllers = new ClusterDistributedController[n + n];
+    for (int i = 0; i < n; i++) {
+      _controllers[i] =
+          new ClusterDistributedController(ZK_ADDR, _controllerClusterName, "controller_" + i);
+      _controllers[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(
+            new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, _controllerClusterName),
+            30000);
+    Assert.assertTrue(result, "Controller cluster NOT in ideal state");
+
+    // start first cluster
+    _participants = new MockParticipantManager[n];
+    _firstClusterName = _clusterNamePrefix + "0_0";
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost0_" + (12918 + i);
+      _participants[i] = new MockParticipantManager(ZK_ADDR, _firstClusterName, instanceName);
+      _participants[i].syncStart();
+    }
+
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+            _firstClusterName));
+    Assert.assertTrue(result, "first cluster NOT in ideal state");
+
+    // add more controllers to controller cluster
+    ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+    for (int i = 0; i < n; i++) {
+      String controller = "controller_" + (n + i);
+      setupTool.addInstanceToCluster(_controllerClusterName, controller);
+    }
+    setupTool.rebalanceStorageCluster(_controllerClusterName, _clusterNamePrefix + "0", 6);
+    for (int i = n; i < 2 * n; i++) {
+      _controllers[i] =
+          new ClusterDistributedController(ZK_ADDR, _controllerClusterName, "controller_" + i);
+      _controllers[i].syncStart();
+    }
+
+    // verify controller cluster
+    result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                _controllerClusterName));
+    Assert.assertTrue(result, "Controller cluster NOT in ideal state");
+
+    // verify first cluster
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+            _firstClusterName));
+    Assert.assertTrue(result, "first cluster NOT in ideal state");
+  }
+  
+  @AfterClass
+  public void afterClass(){
+    System.out.println("Cleaning up...");
+    for (int i = 0; i < 5; i++) {
+      boolean result =
+          ClusterStateVerifier
+              .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                  _controllerClusterName));
+      _controllers[i].syncStop();
+    }
+
+    for (int i = 0; i < 5; i++) {
+      _participants[i].syncStop();
+    }
+
+    System.out.println("END " + _clusterNamePrefix + " at " + new Date(System.currentTimeMillis()));
+
+  }
+  
+  class ParticipantMonitorListener extends ClusterMBeanObserver {
+    
+    int _nMbeansUnregistered = 0;
+    int _nMbeansRegistered = 0;
+    public ParticipantMonitorListener(String domain)
+        throws InstanceNotFoundException, IOException,
+        MalformedObjectNameException, NullPointerException {
+      super(domain);
+    }
+
+    @Override
+    public void onMBeanRegistered(MBeanServerConnection server,
+        MBeanServerNotification mbsNotification) {
+      _nMbeansRegistered ++;
+    }
+
+    @Override
+    public void onMBeanUnRegistered(MBeanServerConnection server,
+        MBeanServerNotification mbsNotification) {
+      _nMbeansUnregistered++;
+    }}
+  
+  @Test
+  public void testClusterStatusMonitorLifecycle() throws InstanceNotFoundException, MalformedObjectNameException, NullPointerException, IOException, InterruptedException{
+    ParticipantMonitorListener listener = new ParticipantMonitorListener("ClusterStatus");
+    
+    int nMbeansUnregistered = listener._nMbeansUnregistered;
+    int nMbeansRegistered = listener._nMbeansRegistered;
+    
+    _participants[0].disconnect();
+
+    // participant goes away. should be no change
+    Thread.sleep(1000);
+    Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered);
+    Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered);
+    
+    HelixDataAccessor accessor = _participants[n-1].getHelixDataAccessor();
+    String firstControllerName = accessor.getProperty(accessor.keyBuilder().controllerLeader()).getId();
+    
+    ClusterDistributedController firstController = null;
+    for(ClusterDistributedController controller : _controllers)
+    {
+      if(controller.getInstanceName().equals(firstControllerName))
+      {
+        firstController = controller;
+      }
+    }
+    firstController.disconnect();
+    Thread.sleep(1000);
+    
+    // 1 cluster status monitor and 1 resource monitor
+    Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 2);
+    Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 2);
+    
+    String instanceName = "localhost0_" + (12918 + 0);
+    _participants[0] = new MockParticipantManager(ZK_ADDR, _firstClusterName, instanceName);
+    _participants[0].syncStart();
+    
+    // participant goes back. should be no change
+    Thread.sleep(1000);
+    Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 2);
+    Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 2);
+    
+    // Add a resource, one more mbean registered
+    ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+    IdealState idealState = accessor.getProperty(accessor.keyBuilder().idealStates("TestDB00"));
+    
+    setupTool.addResourceToCluster(_firstClusterName, "TestDB1", idealState.getNumPartitions(), "MasterSlave");
+    setupTool.rebalanceResource(_firstClusterName, "TestDB1", Integer.parseInt(idealState.getReplicas()));
+    
+    Thread.sleep(1000);
+    Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 2);
+    Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 3);
+    
+    // remove resource, no change
+    setupTool.dropResourceFromCluster(_firstClusterName, "TestDB1");
+    Thread.sleep(1000);
+    Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 2);
+    Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 3);
+    
+    
+  }
+}