You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2014/05/16 19:48:10 UTC

git commit: [HELIX-445] NPE in ZkPathDataDumpTask, rb=21504

Repository: helix
Updated Branches:
  refs/heads/helix-0.6.2-release 6f4c5ffb3 -> 21296c95e


[HELIX-445] NPE in ZkPathDataDumpTask, rb=21504


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/21296c95
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/21296c95
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/21296c95

Branch: refs/heads/helix-0.6.2-release
Commit: 21296c95ec03fab115697792610fb8b56f008790
Parents: 6f4c5ff
Author: zzhang <zz...@apache.org>
Authored: Fri May 16 10:47:53 2014 -0700
Committer: zzhang <zz...@apache.org>
Committed: Fri May 16 10:47:53 2014 -0700

----------------------------------------------------------------------
 helix-core/pom.xml                              |   5 +
 .../main/java/org/apache/helix/PropertyKey.java |  26 +++
 .../apache/helix/manager/zk/ZKHelixManager.java |  16 +-
 .../helix/monitoring/ZKPathDataDumpTask.java    | 173 ++++++++++---------
 .../src/test/java/org/apache/helix/Mocks.java   |   1 -
 .../integration/TestHelixCustomCodeRunner.java  |  46 ++---
 .../helix/integration/TestSchedulerMessage.java |   2 +-
 .../manager/MockParticipantManager.java         |   1 -
 .../MockHealthReportParticipant.java            |  24 ---
 .../helix/mock/participant/MockJobIntf.java     |  28 ---
 .../TestClusterStatusMonitorLifecycle.java      |  21 ++-
 .../monitoring/TestParticipantMonitor.java      |   6 +-
 .../helix/monitoring/TestStatCollector.java     |   6 +-
 .../monitoring/TestZKPathDataDumpTask.java      | 113 ++++++++++++
 .../TestClusterAlertItemMBeanCollection.java    |   7 +-
 pom.xml                                         |   5 +
 16 files changed, 292 insertions(+), 188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/21296c95/helix-core/pom.xml
----------------------------------------------------------------------
diff --git a/helix-core/pom.xml b/helix-core/pom.xml
index 55a03a0..a62fe55 100644
--- a/helix-core/pom.xml
+++ b/helix-core/pom.xml
@@ -126,6 +126,11 @@ under the License.
       </exclusions>
     </dependency>
     <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-math</artifactId>
       <version>2.1</version>

http://git-wip-us.apache.org/repos/asf/helix/blob/21296c95/helix-core/src/main/java/org/apache/helix/PropertyKey.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index 0874958..964f9b2 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -418,6 +418,15 @@ public class PropertyKey {
     }
 
     /**
+     * Get a property key associated with {@link StatusUpdate} of an instance
+     * @param instanceName
+     * @return {@link PropertyKey}
+     */
+    public PropertyKey stateTransitionStatus(String instanceName) {
+      return new PropertyKey(STATUSUPDATES, StatusUpdate.class, _clusterName, instanceName);
+    }
+
+    /**
      * Used to get status update for a NON STATE TRANSITION type
      * @param instanceName
      * @param sessionId
@@ -461,6 +470,15 @@ public class PropertyKey {
     }
 
     /**
+     * Get a property key associated with {@link Error} of an instance
+     * @param instanceName
+     * @return {@link PropertyKey}
+     */
+    public PropertyKey stateTransitionErrors(String instanceName) {
+      return new PropertyKey(ERRORS, Error.class, _clusterName, instanceName);
+    }
+
+    /**
      * Used to get status update for a NON STATE TRANSITION type
      * @param instanceName
      * @param sessionId
@@ -516,6 +534,14 @@ public class PropertyKey {
 
     /**
      * Get a property key associated with {@link StatusUpdate} of controller status updates
+     * @return {@link PropertyKey}
+     */
+    public PropertyKey controllerTaskStatuses() {
+      return new PropertyKey(STATUSUPDATES_CONTROLLER, StatusUpdate.class, _clusterName);
+    }
+
+    /**
+     * Get a property key associated with {@link StatusUpdate} of controller status updates
      * @param subPath
      * @return {@link PropertyKey}
      */

http://git-wip-us.apache.org/repos/asf/helix/blob/21296c95/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index a5ca409..18e3294 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -138,11 +138,9 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
    */
   static class StatusDumpTask extends HelixTimerTask {
     Timer _timer = null;
-    final ZkClient zkclient;
     final HelixManager helixController;
 
-    public StatusDumpTask(ZkClient zkclient, HelixManager helixController) {
-      this.zkclient = zkclient;
+    public StatusDumpTask(HelixManager helixController) {
       this.helixController = helixController;
     }
 
@@ -155,8 +153,8 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
       if (_timer == null) {
         LOG.info("Start StatusDumpTask");
         _timer = new Timer("StatusDumpTimerTask", true);
-        _timer.scheduleAtFixedRate(new ZKPathDataDumpTask(helixController, zkclient,
-            timeThresholdNoChange), initialDelay, period);
+        _timer.scheduleAtFixedRate(new ZKPathDataDumpTask(helixController, timeThresholdNoChange),
+            initialDelay, period);
       }
     }
 
@@ -230,7 +228,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
       _stateMachineEngine = null;
       _participantHealthInfoCollector = null;
       _controllerTimerTasks.add(new HealthStatsAggregationTask(new HealthStatsAggregator(this)));
-      _controllerTimerTasks.add(new StatusDumpTask(_zkclient, this));
+      _controllerTimerTasks.add(new StatusDumpTask(this));
 
       break;
     case CONTROLLER_PARTICIPANT:
@@ -241,7 +239,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
       _timerTasks.add(new ParticipantHealthReportTask(_participantHealthInfoCollector));
 
       _controllerTimerTasks.add(new HealthStatsAggregationTask(new HealthStatsAggregator(this)));
-      _controllerTimerTasks.add(new StatusDumpTask(_zkclient, this));
+      _controllerTimerTasks.add(new StatusDumpTask(this));
 
       break;
     case ADMINISTRATOR:
@@ -627,8 +625,8 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
       String path = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, _clusterName);
       String fallbackPath = String.format("/%s/%s", _clusterName, "HELIX_PROPERTYSTORE");
       _helixPropertyStore =
-          new AutoFallbackPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_zkclient), path,
-              fallbackPath);
+          new AutoFallbackPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_zkclient),
+              path, fallbackPath);
     }
 
     return _helixPropertyStore;

http://git-wip-us.apache.org/repos/asf/helix/blob/21296c95/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java b/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java
index c62da62..a0190d2 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java
@@ -19,36 +19,35 @@ package org.apache.helix.monitoring;
  * under the License.
  */
 
-import java.io.StringWriter;
-import java.util.Date;
 import java.util.List;
 import java.util.TimerTask;
 
+import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyType;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.util.HelixUtil;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.data.Stat;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.SerializationConfig;
+
+import com.google.common.collect.Lists;
 
 public class ZKPathDataDumpTask extends TimerTask {
-  static Logger logger = Logger.getLogger(ZKPathDataDumpTask.class);
+  static Logger LOG = Logger.getLogger(ZKPathDataDumpTask.class);
 
   private final int _thresholdNoChangeInMs;
   private final HelixManager _manager;
-  private final ZkClient _zkClient;
+  private final ZNRecordSerializer _jsonSerializer;
+
+  public ZKPathDataDumpTask(HelixManager manager, int thresholdNoChangeInMs) {
+    LOG.info("Init ZKPathDataDumpTask for cluster: " + manager.getClusterName()
+        + ", thresholdNoChangeInMs: " + thresholdNoChangeInMs);
 
-  public ZKPathDataDumpTask(HelixManager manager, ZkClient zkClient, int thresholdNoChangeInMs) {
     _manager = manager;
-    _zkClient = zkClient;
-    logger.info("Scanning cluster statusUpdate " + manager.getClusterName()
-        + " thresholdNoChangeInMs: " + thresholdNoChangeInMs);
+    _jsonSerializer = new ZNRecordSerializer();
     _thresholdNoChangeInMs = thresholdNoChangeInMs;
   }
 
@@ -59,88 +58,96 @@ public class ZKPathDataDumpTask extends TimerTask {
     // We need to think if we should create per-instance log files that contains
     // per-instance statusUpdates
     // and errors
-    logger.info("Scanning status updates ...");
-    try {
-      HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-      Builder keyBuilder = accessor.keyBuilder();
-
-      List<String> instances = accessor.getChildNames(keyBuilder.instanceConfigs());
-      for (String instanceName : instances) {
-        scanPath(HelixUtil.getInstancePropertyPath(_manager.getClusterName(), instanceName,
-            PropertyType.STATUSUPDATES), _thresholdNoChangeInMs);
-        scanPath(HelixUtil.getInstancePropertyPath(_manager.getClusterName(), instanceName,
-            PropertyType.ERRORS), _thresholdNoChangeInMs * 3);
-      }
-      scanPath(HelixUtil.getControllerPropertyPath(_manager.getClusterName(),
-          PropertyType.STATUSUPDATES_CONTROLLER), _thresholdNoChangeInMs);
+    LOG.info("Scan statusUpdates and errors for cluster: " + _manager.getClusterName()
+        + ", by controller: " + _manager);
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+    BaseDataAccessor<ZNRecord> baseAccessor = accessor.getBaseDataAccessor();
+
+    List<String> instances = accessor.getChildNames(keyBuilder.instanceConfigs());
+    for (String instance : instances) {
+      // dump participant status updates
+      String statusUpdatePath =
+          HelixUtil.getInstancePropertyPath(_manager.getClusterName(), instance,
+              PropertyType.STATUSUPDATES);
+      dump(baseAccessor, statusUpdatePath, _thresholdNoChangeInMs);
 
-      scanPath(HelixUtil.getControllerPropertyPath(_manager.getClusterName(),
-          PropertyType.ERRORS_CONTROLLER), _thresholdNoChangeInMs * 3);
-    } catch (Exception e) {
-      logger.error(e);
+      // dump participant errors
+      String errorPath =
+          HelixUtil.getInstancePropertyPath(_manager.getClusterName(), instance,
+              PropertyType.ERRORS);
+      dump(baseAccessor, errorPath, _thresholdNoChangeInMs * 3);
     }
+    // dump controller status updates
+    String controllerStatusUpdatePath =
+        HelixUtil.getControllerPropertyPath(_manager.getClusterName(),
+            PropertyType.STATUSUPDATES_CONTROLLER);
+    dump(baseAccessor, controllerStatusUpdatePath, _thresholdNoChangeInMs);
+
+    // dump controller errors
+    String controllerErrorPath =
+        HelixUtil.getControllerPropertyPath(_manager.getClusterName(),
+            PropertyType.ERRORS_CONTROLLER);
+    dump(baseAccessor, controllerErrorPath, _thresholdNoChangeInMs);
   }
 
-  void scanPath(String path, int thresholdNoChangeInMs) {
-    logger.info("Scanning path " + path);
-    List<String> subPaths = _zkClient.getChildren(path);
-    for (String subPath : subPaths) {
-      try {
-        String nextPath = path + "/" + subPath;
-        List<String> subSubPaths = _zkClient.getChildren(nextPath);
-        for (String subsubPath : subSubPaths) {
-          try {
-            checkAndDump(nextPath + "/" + subsubPath, thresholdNoChangeInMs);
-          } catch (Exception e) {
-            logger.error(e);
-          }
-        }
-      } catch (Exception e) {
-        logger.error(e);
+  /**
+   * Find paths of all leaf nodes under an ancestor path (exclusive)
+   * @param accessor
+   * @param ancestorPath
+   * @return a list of paths
+   */
+  static List<String> scanPath(BaseDataAccessor<ZNRecord> accessor, String ancestorPath) {
+    List<String> queue = Lists.newLinkedList();
+    queue.add(ancestorPath);
+
+    // BFS
+    List<String> leafPaths = Lists.newArrayList();
+    while (!queue.isEmpty()) {
+      String path = queue.remove(0);
+      List<String> childNames = accessor.getChildNames(path, 0);
+      if (childNames == null) {
+        // path doesn't exist
+        continue;
+      }
+      if (childNames.isEmpty() && !path.equals(ancestorPath)) {
+        // leaf node, excluding ancestorPath
+        leafPaths.add(path);
+      }
+      for (String childName : childNames) {
+        String subPath = String.format("%s/%s", path, childName);
+        queue.add(subPath);
       }
     }
+    return leafPaths;
   }
 
-  void checkAndDump(String path, int thresholdNoChangeInMs) {
-    List<String> subPaths = _zkClient.getChildren(path);
-    if (subPaths.size() == 0) {
-      subPaths.add("");
+  void dump(BaseDataAccessor<ZNRecord> accessor, String ancestorPath, int threshold) {
+    List<String> leafPaths = scanPath(accessor, ancestorPath);
+    if (leafPaths.isEmpty()) {
+      return;
+    }
+
+    Stat[] stats = accessor.getStats(leafPaths, 0);
+    List<String> dumpPaths = Lists.newArrayList();
+    long now = System.currentTimeMillis();
+    for (int i = 0; i < stats.length; i++) {
+      Stat stat = stats[i];
+      if ((now - stat.getMtime()) > threshold) {
+        dumpPaths.add(leafPaths.get(i));
+      }
     }
-    for (String subPath : subPaths) {
-      String fullPath = subPath.length() > 0 ? path + "/" + subPath : path;
-      Stat pathStat = _zkClient.getStat(fullPath);
-
-      long lastModifiedTimeInMs = pathStat.getMtime();
-      long nowInMs = new Date().getTime();
-      // logger.info(nowInMs + " " + lastModifiedTimeInMs + " " + fullPath);
-
-      // Check the last modified time
-      if (nowInMs > lastModifiedTimeInMs) {
-        long timeDiff = nowInMs - lastModifiedTimeInMs;
-        if (timeDiff > thresholdNoChangeInMs) {
-          logger.info("Dumping status update path " + fullPath + " " + timeDiff + "MS has passed");
-          _zkClient.setZkSerializer(new ZNRecordSerializer());
-          ZNRecord record = _zkClient.readData(fullPath);
-
-          // dump the node content into log file
-          ObjectMapper mapper = new ObjectMapper();
-          SerializationConfig serializationConfig = mapper.getSerializationConfig();
-          serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
-
-          StringWriter sw = new StringWriter();
-          try {
-            mapper.writeValue(sw, record);
-            logger.info(sw.toString());
-          } catch (Exception e) {
-            logger
-                .warn(
-                    "Exception during serialization in ZKPathDataDumpTask.checkAndDump. This can mostly be ignored",
-                    e);
-          }
-          // Delete the leaf data
-          _zkClient.deleteRecursive(fullPath);
-        }
+
+    // dump
+    LOG.info("Dump statusUpdates and errors records for pahts: " + dumpPaths);
+    List<ZNRecord> dumpRecords = accessor.get(dumpPaths, null, 0);
+    for (ZNRecord record : dumpRecords) {
+      if (record != null) {
+        LOG.info(new String(_jsonSerializer.serialize(record)));
       }
     }
+
+    // clean up
+    accessor.remove(dumpPaths, 0);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/21296c95/helix-core/src/test/java/org/apache/helix/Mocks.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/Mocks.java b/helix-core/src/test/java/org/apache/helix/Mocks.java
index 9e2452b..a67115b 100644
--- a/helix-core/src/test/java/org/apache/helix/Mocks.java
+++ b/helix-core/src/test/java/org/apache/helix/Mocks.java
@@ -46,7 +46,6 @@ import org.apache.helix.participant.statemachine.StateModelInfo;
 import org.apache.helix.participant.statemachine.Transition;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.zookeeper.data.Stat;
-import org.omg.CORBA._PolicyStub;
 
 public class Mocks {
   public static class MockBaseDataAccessor implements BaseDataAccessor<ZNRecord> {

http://git-wip-us.apache.org/repos/asf/helix/blob/21296c95/helix-core/src/test/java/org/apache/helix/integration/TestHelixCustomCodeRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestHelixCustomCodeRunner.java b/helix-core/src/test/java/org/apache/helix/integration/TestHelixCustomCodeRunner.java
index f6a7098..61fd85b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestHelixCustomCodeRunner.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestHelixCustomCodeRunner.java
@@ -21,6 +21,8 @@ package org.apache.helix.integration;
 
 import java.util.Date;
 
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.TestHelper;
@@ -30,9 +32,10 @@ import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.participant.MockJobIntf;
+import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.participant.CustomCodeCallbackHandler;
 import org.apache.helix.participant.HelixCustomCodeRunner;
@@ -62,30 +65,20 @@ public class TestHelixCustomCodeRunner extends ZkIntegrationTestBase {
 
   }
 
-  class MockJob implements MockJobIntf {
-    @Override
-    public void doPreConnectJob(HelixManager manager) {
-      try {
-        // delay the start of the 1st participant
-        // so there will be a leadership transfer from localhost_12919 to 12918
-        if (manager.getInstanceName().equals("localhost_12918")) {
-          Thread.sleep(2000);
-        }
-
-        HelixCustomCodeRunner customCodeRunner = new HelixCustomCodeRunner(manager, ZK_ADDR);
-        customCodeRunner.invoke(_callback).on(ChangeType.LIVE_INSTANCE)
-            .usingLeaderStandbyModel("TestParticLeader").start();
-      } catch (Exception e) {
-        LOG.error("Exception do pre-connect job", e);
+  private void registerCustomCodeRunner(HelixManager manager) {
+    try {
+      // delay the start of the 1st participant
+      // so there will be a leadership transfer from localhost_12919 to 12918
+      if (manager.getInstanceName().equals("localhost_12918")) {
+        Thread.sleep(2000);
       }
-    }
-
-    @Override
-    public void doPostConnectJob(HelixManager manager) {
-      // TODO Auto-generated method stub
 
+      HelixCustomCodeRunner customCodeRunner = new HelixCustomCodeRunner(manager, ZK_ADDR);
+      customCodeRunner.invoke(_callback).on(ChangeType.LIVE_INSTANCE)
+          .usingLeaderStandbyModel("TestParticLeader").start();
+    } catch (Exception e) {
+      LOG.error("Exception do pre-connect job", e);
     }
-
   }
 
   @Test
@@ -109,10 +102,9 @@ public class TestHelixCustomCodeRunner extends ZkIntegrationTestBase {
     for (int i = 0; i < _nodeNb; i++) {
       String instanceName = "localhost_" + (_startPort + i);
 
-      MockJob job = new MockJob();
       participants[i] = new MockParticipantManager(ZK_ADDR, _clusterName, instanceName);
 
-      job.doPreConnectJob(participants[i]);
+      registerCustomCodeRunner(participants[i]);
       participants[i].syncStart();
     }
     boolean result =
@@ -125,9 +117,9 @@ public class TestHelixCustomCodeRunner extends ZkIntegrationTestBase {
     _callback._isCallbackInvoked = false;
 
     // add a new live instance
-    // ZkClient zkClient = new ZkClient(ZK_ADDR);
-    // zkClient.setZkSerializer(new ZNRecordSerializer());
-    ZKHelixDataAccessor accessor =
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+    admin.addInstance(_clusterName, new InstanceConfig("newLiveInstance"));
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     Builder keyBuilder = accessor.keyBuilder();
 

http://git-wip-us.apache.org/repos/asf/helix/blob/21296c95/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
index 8c69a38..281b306 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
@@ -393,7 +393,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
       }
     }
     Thread.sleep(3000);
-    ZKPathDataDumpTask dumpTask = new ZKPathDataDumpTask(manager, _gZkClient, 0);
+    ZKPathDataDumpTask dumpTask = new ZKPathDataDumpTask(manager, 0);
     dumpTask.run();
 
     subPaths = _gZkClient.getChildren(controllerStatusPath);

http://git-wip-us.apache.org/repos/asf/helix/blob/21296c95/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
index 34efe34..917be17 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
@@ -28,7 +28,6 @@ import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.mock.participant.DummyProcess.DummyLeaderStandbyStateModelFactory;
 import org.apache.helix.mock.participant.DummyProcess.DummyOnlineOfflineStateModelFactory;
-import org.apache.helix.mock.participant.MockJobIntf;
 import org.apache.helix.mock.participant.MockMSModelFactory;
 import org.apache.helix.mock.participant.MockSchemataModelFactory;
 import org.apache.helix.mock.participant.MockTransition;

http://git-wip-us.apache.org/repos/asf/helix/blob/21296c95/helix-core/src/test/java/org/apache/helix/mock/participant/MockHealthReportParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/MockHealthReportParticipant.java b/helix-core/src/test/java/org/apache/helix/mock/participant/MockHealthReportParticipant.java
index 31811bb..04207d6 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/participant/MockHealthReportParticipant.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/participant/MockHealthReportParticipant.java
@@ -122,30 +122,6 @@ public class MockHealthReportParticipant {
     }
   }
 
-  static class MockHealthReportJob implements MockJobIntf {
-
-    @Override
-    public void doPreConnectJob(HelixManager manager) {
-      // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    public void doPostConnectJob(HelixManager manager) {
-      // TODO Auto-generated method stub
-      manager.getHealthReportCollector().addHealthReportProvider(new MockHealthReportProvider());
-
-      // // set property store path for perf test
-      // final String setPath = "/TEST_PERF/set";
-      // final String updatePath = "/TEST_PERF/update";
-      // manager.getHelixPropertyStore().create(setPath, new ZNRecord(setPath),
-      // BaseDataAccessor.Option.PERSISTENT);
-      // manager.getHelixPropertyStore().set(updatePath, new ZNRecord(updatePath),
-      // BaseDataAccessor.Option.PERSISTENT);
-    }
-
-  }
-
   // hack OptionBuilder is not thread safe
   @SuppressWarnings("static-access")
   synchronized private static Options constructCommandLineOptions() {

http://git-wip-us.apache.org/repos/asf/helix/blob/21296c95/helix-core/src/test/java/org/apache/helix/mock/participant/MockJobIntf.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/MockJobIntf.java b/helix-core/src/test/java/org/apache/helix/mock/participant/MockJobIntf.java
deleted file mode 100644
index 4b637a6..0000000
--- a/helix-core/src/test/java/org/apache/helix/mock/participant/MockJobIntf.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package org.apache.helix.mock.participant;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.HelixManager;
-
-public interface MockJobIntf {
-  public void doPreConnectJob(HelixManager manager);
-
-  public void doPostConnectJob(HelixManager manager);
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/21296c95/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
index 58b691a..0981a2e 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
@@ -1,5 +1,24 @@
 package org.apache.helix.monitoring;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import java.io.IOException;
 import java.util.Date;
 
@@ -183,7 +202,7 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
 
     // 1 participant goes away
     // No change in instance/resource mbean
-    // Unregister 1 per-instance resource mbean 
+    // Unregister 1 per-instance resource mbean
     Thread.sleep(1000);
     Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 1);
     Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered);

http://git-wip-us.apache.org/repos/asf/helix/blob/21296c95/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
index 74b1a89..5f44b36 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
@@ -96,10 +96,8 @@ public class TestParticipantMonitor {
     }
   }
 
-  @Test(groups = {
-    "unitTest"
-  })
-  public void TestReportData() throws InstanceNotFoundException, MalformedObjectNameException,
+  @Test()
+  public void testReportData() throws InstanceNotFoundException, MalformedObjectNameException,
       NullPointerException, IOException, InterruptedException {
     System.out.println("START TestParticipantMonitor");
     ParticipantMonitor monitor = new ParticipantMonitor();

http://git-wip-us.apache.org/repos/asf/helix/blob/21296c95/helix-core/src/test/java/org/apache/helix/monitoring/TestStatCollector.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestStatCollector.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestStatCollector.java
index c35c961..7a4a941 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestStatCollector.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestStatCollector.java
@@ -23,10 +23,8 @@ import org.testng.AssertJUnit;
 import org.testng.annotations.Test;
 
 public class TestStatCollector {
-  @Test(groups = {
-    "unitTest"
-  })
-  public void TestCollectData() {
+  @Test()
+  public void testCollectData() {
     StatCollector collector = new StatCollector();
 
     int nPoints = 100;

http://git-wip-us.apache.org/repos/asf/helix/blob/21296c95/helix-core/src/test/java/org/apache/helix/monitoring/TestZKPathDataDumpTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestZKPathDataDumpTask.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestZKPathDataDumpTask.java
new file mode 100644
index 0000000..a3d8ae3
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestZKPathDataDumpTask.java
@@ -0,0 +1,113 @@
+package org.apache.helix.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.StatusUpdate;
+import org.apache.helix.model.Error;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestZKPathDataDumpTask extends ZkUnitTestBase {
+
+  @Test
+  public void test() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    int n = 1;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        2, // partitions per resource
+        n, // number of nodes
+        1, // replicas
+        "MasterSlave", true); // do rebalance
+
+    HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    BaseDataAccessor<ZNRecord> baseAccessor = accessor.getBaseDataAccessor();
+
+    HelixManager manager = mock(HelixManager.class);
+    when(manager.getHelixDataAccessor()).thenReturn(accessor);
+    when(manager.getClusterName()).thenReturn(clusterName);
+
+    // run dump task without statusUpdates and errors, should not remove any existing statusUpdate/error paths
+    ZKPathDataDumpTask task = new ZKPathDataDumpTask(manager, 0);
+    task.run();
+    PropertyKey controllerStatusUpdateKey = keyBuilder.controllerTaskStatuses();
+    Assert.assertTrue(baseAccessor.exists(controllerStatusUpdateKey.getPath(), 0));
+    PropertyKey controllerErrorKey = keyBuilder.controllerTaskErrors();
+    Assert.assertTrue(baseAccessor.exists(controllerErrorKey.getPath(), 0));
+    PropertyKey statusUpdateKey = keyBuilder.stateTransitionStatus("localhost_12918");
+    Assert.assertTrue(baseAccessor.exists(statusUpdateKey.getPath(), 0));
+    PropertyKey errorKey = keyBuilder.stateTransitionErrors("localhost_12918");
+
+    // add participant status updates and errors
+    statusUpdateKey =
+        keyBuilder.stateTransitionStatus("localhost_12918", "session_0", "TestDB0", "TestDB0_0");
+    accessor.setProperty(statusUpdateKey, new StatusUpdate(new ZNRecord("statusUpdate")));
+    errorKey =
+        keyBuilder.stateTransitionError("localhost_12918", "session_0", "TestDB0", "TestDB0_0");
+    accessor.setProperty(errorKey, new Error(new ZNRecord("error")));
+
+    // add controller status updates and errors
+    controllerStatusUpdateKey = keyBuilder.controllerTaskStatus("session_0", "TestDB");
+    accessor.setProperty(controllerStatusUpdateKey, new StatusUpdate(new ZNRecord("controllerStatusUpdate")));
+    controllerErrorKey = keyBuilder.controllerTaskError("TestDB_error");
+    accessor.setProperty(controllerErrorKey, new Error(new ZNRecord("controllerError")));
+
+    // run dump task, should remove existing statusUpdate/error paths
+    task.run();
+    Assert.assertFalse(baseAccessor.exists(controllerStatusUpdateKey.getPath(), 0));
+    Assert.assertFalse(baseAccessor.exists(controllerErrorKey.getPath(), 0));
+    Assert.assertFalse(baseAccessor.exists(statusUpdateKey.getPath(), 0));
+    Assert.assertFalse(baseAccessor.exists(errorKey.getPath(), 0));
+
+    controllerStatusUpdateKey = keyBuilder.controllerTaskStatuses();
+    Assert.assertTrue(baseAccessor.exists(controllerStatusUpdateKey.getPath(), 0));
+    controllerErrorKey = keyBuilder.controllerTaskErrors();
+    Assert.assertTrue(baseAccessor.exists(controllerErrorKey.getPath(), 0));
+    statusUpdateKey = keyBuilder.stateTransitionStatus("localhost_12918");
+    Assert.assertTrue(baseAccessor.exists(statusUpdateKey.getPath(), 0));
+    errorKey = keyBuilder.stateTransitionErrors("localhost_12918");
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/21296c95/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAlertItemMBeanCollection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAlertItemMBeanCollection.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAlertItemMBeanCollection.java
index e383291..d2e5395 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAlertItemMBeanCollection.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAlertItemMBeanCollection.java
@@ -22,7 +22,6 @@ package org.apache.helix.monitoring.mbeans;
 import java.io.IOException;
 import java.io.StringReader;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import javax.management.AttributeNotFoundException;
@@ -32,14 +31,12 @@ import javax.management.MBeanException;
 import javax.management.MalformedObjectNameException;
 import javax.management.ReflectionException;
 
-import org.apache.helix.ZNRecord;
 import org.apache.helix.alerts.AlertValueAndStatus;
 import org.apache.helix.alerts.Tuple;
 import org.apache.helix.healthcheck.TestWildcardAlert.TestClusterMBeanObserver;
 import org.apache.helix.monitoring.mbeans.ClusterAlertMBeanCollection;
 import org.apache.log4j.Logger;
 import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.type.TypeReference;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -47,7 +44,7 @@ public class TestClusterAlertItemMBeanCollection {
   private static final Logger _logger = Logger.getLogger(TestClusterAlertItemMBeanCollection.class);
 
   @Test
-  public void TestAlertReportingHistory() throws InstanceNotFoundException,
+  public void testAlertReportingHistory() throws InstanceNotFoundException,
       MalformedObjectNameException, NullPointerException, IOException, IntrospectionException,
       AttributeNotFoundException, ReflectionException, MBeanException {
     ClusterAlertMBeanCollection beanCollection = new ClusterAlertMBeanCollection();
@@ -226,7 +223,7 @@ public class TestClusterAlertItemMBeanCollection {
   }
 
   @Test
-  public void TestAlertRefresh() throws InstanceNotFoundException, MalformedObjectNameException,
+  public void testAlertRefresh() throws InstanceNotFoundException, MalformedObjectNameException,
       NullPointerException, IOException, IntrospectionException, AttributeNotFoundException,
       ReflectionException, MBeanException, InterruptedException {
     ClusterAlertMBeanCollection beanCollection = new ClusterAlertMBeanCollection();

http://git-wip-us.apache.org/repos/asf/helix/blob/21296c95/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b14e164..bbebf55 100644
--- a/pom.xml
+++ b/pom.xml
@@ -290,6 +290,11 @@ under the License.
         <version>6.0.1</version>
       </dependency>
       <dependency>
+        <groupId>org.mockito</groupId>
+        <artifactId>mockito-all</artifactId>
+        <version>1.9.5</version>
+      </dependency>
+      <dependency>
         <groupId>org.yaml</groupId>
         <artifactId>snakeyaml</artifactId>
         <version>1.12</version>