You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2014/05/16 19:48:10 UTC
git commit: [HELIX-445] NPE in ZkPathDataDumpTask, rb=21504
Repository: helix
Updated Branches:
refs/heads/helix-0.6.2-release 6f4c5ffb3 -> 21296c95e
[HELIX-445] NPE in ZkPathDataDumpTask, rb=21504
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/21296c95
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/21296c95
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/21296c95
Branch: refs/heads/helix-0.6.2-release
Commit: 21296c95ec03fab115697792610fb8b56f008790
Parents: 6f4c5ff
Author: zzhang <zz...@apache.org>
Authored: Fri May 16 10:47:53 2014 -0700
Committer: zzhang <zz...@apache.org>
Committed: Fri May 16 10:47:53 2014 -0700
----------------------------------------------------------------------
helix-core/pom.xml | 5 +
.../main/java/org/apache/helix/PropertyKey.java | 26 +++
.../apache/helix/manager/zk/ZKHelixManager.java | 16 +-
.../helix/monitoring/ZKPathDataDumpTask.java | 173 ++++++++++---------
.../src/test/java/org/apache/helix/Mocks.java | 1 -
.../integration/TestHelixCustomCodeRunner.java | 46 ++---
.../helix/integration/TestSchedulerMessage.java | 2 +-
.../manager/MockParticipantManager.java | 1 -
.../MockHealthReportParticipant.java | 24 ---
.../helix/mock/participant/MockJobIntf.java | 28 ---
.../TestClusterStatusMonitorLifecycle.java | 21 ++-
.../monitoring/TestParticipantMonitor.java | 6 +-
.../helix/monitoring/TestStatCollector.java | 6 +-
.../monitoring/TestZKPathDataDumpTask.java | 113 ++++++++++++
.../TestClusterAlertItemMBeanCollection.java | 7 +-
pom.xml | 5 +
16 files changed, 292 insertions(+), 188 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/21296c95/helix-core/pom.xml
----------------------------------------------------------------------
diff --git a/helix-core/pom.xml b/helix-core/pom.xml
index 55a03a0..a62fe55 100644
--- a/helix-core/pom.xml
+++ b/helix-core/pom.xml
@@ -126,6 +126,11 @@ under the License.
</exclusions>
</dependency>
<dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math</artifactId>
<version>2.1</version>
http://git-wip-us.apache.org/repos/asf/helix/blob/21296c95/helix-core/src/main/java/org/apache/helix/PropertyKey.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index 0874958..964f9b2 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -418,6 +418,15 @@ public class PropertyKey {
}
/**
+ * Get a property key associated with {@link StatusUpdate} of an instance
+ * @param instanceName
+ * @return {@link PropertyKey}
+ */
+ public PropertyKey stateTransitionStatus(String instanceName) {
+ return new PropertyKey(STATUSUPDATES, StatusUpdate.class, _clusterName, instanceName);
+ }
+
+ /**
* Used to get status update for a NON STATE TRANSITION type
* @param instanceName
* @param sessionId
@@ -461,6 +470,15 @@ public class PropertyKey {
}
/**
+ * Get a property key associated with {@link Error} of an instance
+ * @param instanceName
+ * @return {@link PropertyKey}
+ */
+ public PropertyKey stateTransitionErrors(String instanceName) {
+ return new PropertyKey(ERRORS, Error.class, _clusterName, instanceName);
+ }
+
+ /**
* Used to get status update for a NON STATE TRANSITION type
* @param instanceName
* @param sessionId
@@ -516,6 +534,14 @@ public class PropertyKey {
/**
* Get a property key associated with {@link StatusUpdate} of controller status updates
+ * @return {@link PropertyKey}
+ */
+ public PropertyKey controllerTaskStatuses() {
+ return new PropertyKey(STATUSUPDATES_CONTROLLER, StatusUpdate.class, _clusterName);
+ }
+
+ /**
+ * Get a property key associated with {@link StatusUpdate} of controller status updates
* @param subPath
* @return {@link PropertyKey}
*/
http://git-wip-us.apache.org/repos/asf/helix/blob/21296c95/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index a5ca409..18e3294 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -138,11 +138,9 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
*/
static class StatusDumpTask extends HelixTimerTask {
Timer _timer = null;
- final ZkClient zkclient;
final HelixManager helixController;
- public StatusDumpTask(ZkClient zkclient, HelixManager helixController) {
- this.zkclient = zkclient;
+ public StatusDumpTask(HelixManager helixController) {
this.helixController = helixController;
}
@@ -155,8 +153,8 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
if (_timer == null) {
LOG.info("Start StatusDumpTask");
_timer = new Timer("StatusDumpTimerTask", true);
- _timer.scheduleAtFixedRate(new ZKPathDataDumpTask(helixController, zkclient,
- timeThresholdNoChange), initialDelay, period);
+ _timer.scheduleAtFixedRate(new ZKPathDataDumpTask(helixController, timeThresholdNoChange),
+ initialDelay, period);
}
}
@@ -230,7 +228,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
_stateMachineEngine = null;
_participantHealthInfoCollector = null;
_controllerTimerTasks.add(new HealthStatsAggregationTask(new HealthStatsAggregator(this)));
- _controllerTimerTasks.add(new StatusDumpTask(_zkclient, this));
+ _controllerTimerTasks.add(new StatusDumpTask(this));
break;
case CONTROLLER_PARTICIPANT:
@@ -241,7 +239,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
_timerTasks.add(new ParticipantHealthReportTask(_participantHealthInfoCollector));
_controllerTimerTasks.add(new HealthStatsAggregationTask(new HealthStatsAggregator(this)));
- _controllerTimerTasks.add(new StatusDumpTask(_zkclient, this));
+ _controllerTimerTasks.add(new StatusDumpTask(this));
break;
case ADMINISTRATOR:
@@ -627,8 +625,8 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
String path = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, _clusterName);
String fallbackPath = String.format("/%s/%s", _clusterName, "HELIX_PROPERTYSTORE");
_helixPropertyStore =
- new AutoFallbackPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_zkclient), path,
- fallbackPath);
+ new AutoFallbackPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_zkclient),
+ path, fallbackPath);
}
return _helixPropertyStore;
http://git-wip-us.apache.org/repos/asf/helix/blob/21296c95/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java b/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java
index c62da62..a0190d2 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java
@@ -19,36 +19,35 @@ package org.apache.helix.monitoring;
* under the License.
*/
-import java.io.StringWriter;
-import java.util.Date;
import java.util.List;
import java.util.TimerTask;
+import org.apache.helix.BaseDataAccessor;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.PropertyType;
import org.apache.helix.ZNRecord;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.util.HelixUtil;
import org.apache.log4j.Logger;
import org.apache.zookeeper.data.Stat;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.SerializationConfig;
+
+import com.google.common.collect.Lists;
public class ZKPathDataDumpTask extends TimerTask {
- static Logger logger = Logger.getLogger(ZKPathDataDumpTask.class);
+ static Logger LOG = Logger.getLogger(ZKPathDataDumpTask.class);
private final int _thresholdNoChangeInMs;
private final HelixManager _manager;
- private final ZkClient _zkClient;
+ private final ZNRecordSerializer _jsonSerializer;
+
+ public ZKPathDataDumpTask(HelixManager manager, int thresholdNoChangeInMs) {
+ LOG.info("Init ZKPathDataDumpTask for cluster: " + manager.getClusterName()
+ + ", thresholdNoChangeInMs: " + thresholdNoChangeInMs);
- public ZKPathDataDumpTask(HelixManager manager, ZkClient zkClient, int thresholdNoChangeInMs) {
_manager = manager;
- _zkClient = zkClient;
- logger.info("Scanning cluster statusUpdate " + manager.getClusterName()
- + " thresholdNoChangeInMs: " + thresholdNoChangeInMs);
+ _jsonSerializer = new ZNRecordSerializer();
_thresholdNoChangeInMs = thresholdNoChangeInMs;
}
@@ -59,88 +58,96 @@ public class ZKPathDataDumpTask extends TimerTask {
// We need to think if we should create per-instance log files that contains
// per-instance statusUpdates
// and errors
- logger.info("Scanning status updates ...");
- try {
- HelixDataAccessor accessor = _manager.getHelixDataAccessor();
- Builder keyBuilder = accessor.keyBuilder();
-
- List<String> instances = accessor.getChildNames(keyBuilder.instanceConfigs());
- for (String instanceName : instances) {
- scanPath(HelixUtil.getInstancePropertyPath(_manager.getClusterName(), instanceName,
- PropertyType.STATUSUPDATES), _thresholdNoChangeInMs);
- scanPath(HelixUtil.getInstancePropertyPath(_manager.getClusterName(), instanceName,
- PropertyType.ERRORS), _thresholdNoChangeInMs * 3);
- }
- scanPath(HelixUtil.getControllerPropertyPath(_manager.getClusterName(),
- PropertyType.STATUSUPDATES_CONTROLLER), _thresholdNoChangeInMs);
+ LOG.info("Scan statusUpdates and errors for cluster: " + _manager.getClusterName()
+ + ", by controller: " + _manager);
+ HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+ Builder keyBuilder = accessor.keyBuilder();
+ BaseDataAccessor<ZNRecord> baseAccessor = accessor.getBaseDataAccessor();
+
+ List<String> instances = accessor.getChildNames(keyBuilder.instanceConfigs());
+ for (String instance : instances) {
+ // dump participant status updates
+ String statusUpdatePath =
+ HelixUtil.getInstancePropertyPath(_manager.getClusterName(), instance,
+ PropertyType.STATUSUPDATES);
+ dump(baseAccessor, statusUpdatePath, _thresholdNoChangeInMs);
- scanPath(HelixUtil.getControllerPropertyPath(_manager.getClusterName(),
- PropertyType.ERRORS_CONTROLLER), _thresholdNoChangeInMs * 3);
- } catch (Exception e) {
- logger.error(e);
+ // dump participant errors
+ String errorPath =
+ HelixUtil.getInstancePropertyPath(_manager.getClusterName(), instance,
+ PropertyType.ERRORS);
+ dump(baseAccessor, errorPath, _thresholdNoChangeInMs * 3);
}
+ // dump controller status updates
+ String controllerStatusUpdatePath =
+ HelixUtil.getControllerPropertyPath(_manager.getClusterName(),
+ PropertyType.STATUSUPDATES_CONTROLLER);
+ dump(baseAccessor, controllerStatusUpdatePath, _thresholdNoChangeInMs);
+
+ // dump controller errors
+ String controllerErrorPath =
+ HelixUtil.getControllerPropertyPath(_manager.getClusterName(),
+ PropertyType.ERRORS_CONTROLLER);
+ dump(baseAccessor, controllerErrorPath, _thresholdNoChangeInMs);
}
- void scanPath(String path, int thresholdNoChangeInMs) {
- logger.info("Scanning path " + path);
- List<String> subPaths = _zkClient.getChildren(path);
- for (String subPath : subPaths) {
- try {
- String nextPath = path + "/" + subPath;
- List<String> subSubPaths = _zkClient.getChildren(nextPath);
- for (String subsubPath : subSubPaths) {
- try {
- checkAndDump(nextPath + "/" + subsubPath, thresholdNoChangeInMs);
- } catch (Exception e) {
- logger.error(e);
- }
- }
- } catch (Exception e) {
- logger.error(e);
+ /**
+ * Find paths of all leaf nodes under an ancestor path (exclusive)
+ * @param accessor
+ * @param ancestorPath
+ * @return a list of paths
+ */
+ static List<String> scanPath(BaseDataAccessor<ZNRecord> accessor, String ancestorPath) {
+ List<String> queue = Lists.newLinkedList();
+ queue.add(ancestorPath);
+
+ // BFS
+ List<String> leafPaths = Lists.newArrayList();
+ while (!queue.isEmpty()) {
+ String path = queue.remove(0);
+ List<String> childNames = accessor.getChildNames(path, 0);
+ if (childNames == null) {
+ // path doesn't exist
+ continue;
+ }
+ if (childNames.isEmpty() && !path.equals(ancestorPath)) {
+ // leaf node, excluding ancestorPath
+ leafPaths.add(path);
+ }
+ for (String childName : childNames) {
+ String subPath = String.format("%s/%s", path, childName);
+ queue.add(subPath);
}
}
+ return leafPaths;
}
- void checkAndDump(String path, int thresholdNoChangeInMs) {
- List<String> subPaths = _zkClient.getChildren(path);
- if (subPaths.size() == 0) {
- subPaths.add("");
+ void dump(BaseDataAccessor<ZNRecord> accessor, String ancestorPath, int threshold) {
+ List<String> leafPaths = scanPath(accessor, ancestorPath);
+ if (leafPaths.isEmpty()) {
+ return;
+ }
+
+ Stat[] stats = accessor.getStats(leafPaths, 0);
+ List<String> dumpPaths = Lists.newArrayList();
+ long now = System.currentTimeMillis();
+ for (int i = 0; i < stats.length; i++) {
+ Stat stat = stats[i];
+ if ((now - stat.getMtime()) > threshold) {
+ dumpPaths.add(leafPaths.get(i));
+ }
}
- for (String subPath : subPaths) {
- String fullPath = subPath.length() > 0 ? path + "/" + subPath : path;
- Stat pathStat = _zkClient.getStat(fullPath);
-
- long lastModifiedTimeInMs = pathStat.getMtime();
- long nowInMs = new Date().getTime();
- // logger.info(nowInMs + " " + lastModifiedTimeInMs + " " + fullPath);
-
- // Check the last modified time
- if (nowInMs > lastModifiedTimeInMs) {
- long timeDiff = nowInMs - lastModifiedTimeInMs;
- if (timeDiff > thresholdNoChangeInMs) {
- logger.info("Dumping status update path " + fullPath + " " + timeDiff + "MS has passed");
- _zkClient.setZkSerializer(new ZNRecordSerializer());
- ZNRecord record = _zkClient.readData(fullPath);
-
- // dump the node content into log file
- ObjectMapper mapper = new ObjectMapper();
- SerializationConfig serializationConfig = mapper.getSerializationConfig();
- serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
-
- StringWriter sw = new StringWriter();
- try {
- mapper.writeValue(sw, record);
- logger.info(sw.toString());
- } catch (Exception e) {
- logger
- .warn(
- "Exception during serialization in ZKPathDataDumpTask.checkAndDump. This can mostly be ignored",
- e);
- }
- // Delete the leaf data
- _zkClient.deleteRecursive(fullPath);
- }
+
+ // dump
+ LOG.info("Dump statusUpdates and errors records for pahts: " + dumpPaths);
+ List<ZNRecord> dumpRecords = accessor.get(dumpPaths, null, 0);
+ for (ZNRecord record : dumpRecords) {
+ if (record != null) {
+ LOG.info(new String(_jsonSerializer.serialize(record)));
}
}
+
+ // clean up
+ accessor.remove(dumpPaths, 0);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/21296c95/helix-core/src/test/java/org/apache/helix/Mocks.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/Mocks.java b/helix-core/src/test/java/org/apache/helix/Mocks.java
index 9e2452b..a67115b 100644
--- a/helix-core/src/test/java/org/apache/helix/Mocks.java
+++ b/helix-core/src/test/java/org/apache/helix/Mocks.java
@@ -46,7 +46,6 @@ import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.Transition;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.zookeeper.data.Stat;
-import org.omg.CORBA._PolicyStub;
public class Mocks {
public static class MockBaseDataAccessor implements BaseDataAccessor<ZNRecord> {
http://git-wip-us.apache.org/repos/asf/helix/blob/21296c95/helix-core/src/test/java/org/apache/helix/integration/TestHelixCustomCodeRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestHelixCustomCodeRunner.java b/helix-core/src/test/java/org/apache/helix/integration/TestHelixCustomCodeRunner.java
index f6a7098..61fd85b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestHelixCustomCodeRunner.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestHelixCustomCodeRunner.java
@@ -21,6 +21,8 @@ package org.apache.helix.integration;
import java.util.Date;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
import org.apache.helix.TestHelper;
@@ -30,9 +32,10 @@ import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.ZNRecord;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.participant.MockJobIntf;
+import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.participant.CustomCodeCallbackHandler;
import org.apache.helix.participant.HelixCustomCodeRunner;
@@ -62,30 +65,20 @@ public class TestHelixCustomCodeRunner extends ZkIntegrationTestBase {
}
- class MockJob implements MockJobIntf {
- @Override
- public void doPreConnectJob(HelixManager manager) {
- try {
- // delay the start of the 1st participant
- // so there will be a leadership transfer from localhost_12919 to 12918
- if (manager.getInstanceName().equals("localhost_12918")) {
- Thread.sleep(2000);
- }
-
- HelixCustomCodeRunner customCodeRunner = new HelixCustomCodeRunner(manager, ZK_ADDR);
- customCodeRunner.invoke(_callback).on(ChangeType.LIVE_INSTANCE)
- .usingLeaderStandbyModel("TestParticLeader").start();
- } catch (Exception e) {
- LOG.error("Exception do pre-connect job", e);
+ private void registerCustomCodeRunner(HelixManager manager) {
+ try {
+ // delay the start of the 1st participant
+ // so there will be a leadership transfer from localhost_12919 to 12918
+ if (manager.getInstanceName().equals("localhost_12918")) {
+ Thread.sleep(2000);
}
- }
-
- @Override
- public void doPostConnectJob(HelixManager manager) {
- // TODO Auto-generated method stub
+ HelixCustomCodeRunner customCodeRunner = new HelixCustomCodeRunner(manager, ZK_ADDR);
+ customCodeRunner.invoke(_callback).on(ChangeType.LIVE_INSTANCE)
+ .usingLeaderStandbyModel("TestParticLeader").start();
+ } catch (Exception e) {
+ LOG.error("Exception do pre-connect job", e);
}
-
}
@Test
@@ -109,10 +102,9 @@ public class TestHelixCustomCodeRunner extends ZkIntegrationTestBase {
for (int i = 0; i < _nodeNb; i++) {
String instanceName = "localhost_" + (_startPort + i);
- MockJob job = new MockJob();
participants[i] = new MockParticipantManager(ZK_ADDR, _clusterName, instanceName);
- job.doPreConnectJob(participants[i]);
+ registerCustomCodeRunner(participants[i]);
participants[i].syncStart();
}
boolean result =
@@ -125,9 +117,9 @@ public class TestHelixCustomCodeRunner extends ZkIntegrationTestBase {
_callback._isCallbackInvoked = false;
// add a new live instance
- // ZkClient zkClient = new ZkClient(ZK_ADDR);
- // zkClient.setZkSerializer(new ZNRecordSerializer());
- ZKHelixDataAccessor accessor =
+ HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+ admin.addInstance(_clusterName, new InstanceConfig("newLiveInstance"));
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
Builder keyBuilder = accessor.keyBuilder();
http://git-wip-us.apache.org/repos/asf/helix/blob/21296c95/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
index 8c69a38..281b306 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
@@ -393,7 +393,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
}
}
Thread.sleep(3000);
- ZKPathDataDumpTask dumpTask = new ZKPathDataDumpTask(manager, _gZkClient, 0);
+ ZKPathDataDumpTask dumpTask = new ZKPathDataDumpTask(manager, 0);
dumpTask.run();
subPaths = _gZkClient.getChildren(controllerStatusPath);
http://git-wip-us.apache.org/repos/asf/helix/blob/21296c95/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
index 34efe34..917be17 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
@@ -28,7 +28,6 @@ import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.mock.participant.DummyProcess.DummyLeaderStandbyStateModelFactory;
import org.apache.helix.mock.participant.DummyProcess.DummyOnlineOfflineStateModelFactory;
-import org.apache.helix.mock.participant.MockJobIntf;
import org.apache.helix.mock.participant.MockMSModelFactory;
import org.apache.helix.mock.participant.MockSchemataModelFactory;
import org.apache.helix.mock.participant.MockTransition;
http://git-wip-us.apache.org/repos/asf/helix/blob/21296c95/helix-core/src/test/java/org/apache/helix/mock/participant/MockHealthReportParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/MockHealthReportParticipant.java b/helix-core/src/test/java/org/apache/helix/mock/participant/MockHealthReportParticipant.java
index 31811bb..04207d6 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/participant/MockHealthReportParticipant.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/participant/MockHealthReportParticipant.java
@@ -122,30 +122,6 @@ public class MockHealthReportParticipant {
}
}
- static class MockHealthReportJob implements MockJobIntf {
-
- @Override
- public void doPreConnectJob(HelixManager manager) {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void doPostConnectJob(HelixManager manager) {
- // TODO Auto-generated method stub
- manager.getHealthReportCollector().addHealthReportProvider(new MockHealthReportProvider());
-
- // // set property store path for perf test
- // final String setPath = "/TEST_PERF/set";
- // final String updatePath = "/TEST_PERF/update";
- // manager.getHelixPropertyStore().create(setPath, new ZNRecord(setPath),
- // BaseDataAccessor.Option.PERSISTENT);
- // manager.getHelixPropertyStore().set(updatePath, new ZNRecord(updatePath),
- // BaseDataAccessor.Option.PERSISTENT);
- }
-
- }
-
// hack OptionBuilder is not thread safe
@SuppressWarnings("static-access")
synchronized private static Options constructCommandLineOptions() {
http://git-wip-us.apache.org/repos/asf/helix/blob/21296c95/helix-core/src/test/java/org/apache/helix/mock/participant/MockJobIntf.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/MockJobIntf.java b/helix-core/src/test/java/org/apache/helix/mock/participant/MockJobIntf.java
deleted file mode 100644
index 4b637a6..0000000
--- a/helix-core/src/test/java/org/apache/helix/mock/participant/MockJobIntf.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package org.apache.helix.mock.participant;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.HelixManager;
-
-public interface MockJobIntf {
- public void doPreConnectJob(HelixManager manager);
-
- public void doPostConnectJob(HelixManager manager);
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/21296c95/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
index 58b691a..0981a2e 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
@@ -1,5 +1,24 @@
package org.apache.helix.monitoring;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
import java.io.IOException;
import java.util.Date;
@@ -183,7 +202,7 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
// 1 participant goes away
// No change in instance/resource mbean
- // Unregister 1 per-instance resource mbean
+ // Unregister 1 per-instance resource mbean
Thread.sleep(1000);
Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 1);
Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered);
http://git-wip-us.apache.org/repos/asf/helix/blob/21296c95/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
index 74b1a89..5f44b36 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
@@ -96,10 +96,8 @@ public class TestParticipantMonitor {
}
}
- @Test(groups = {
- "unitTest"
- })
- public void TestReportData() throws InstanceNotFoundException, MalformedObjectNameException,
+ @Test()
+ public void testReportData() throws InstanceNotFoundException, MalformedObjectNameException,
NullPointerException, IOException, InterruptedException {
System.out.println("START TestParticipantMonitor");
ParticipantMonitor monitor = new ParticipantMonitor();
http://git-wip-us.apache.org/repos/asf/helix/blob/21296c95/helix-core/src/test/java/org/apache/helix/monitoring/TestStatCollector.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestStatCollector.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestStatCollector.java
index c35c961..7a4a941 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestStatCollector.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestStatCollector.java
@@ -23,10 +23,8 @@ import org.testng.AssertJUnit;
import org.testng.annotations.Test;
public class TestStatCollector {
- @Test(groups = {
- "unitTest"
- })
- public void TestCollectData() {
+ @Test()
+ public void testCollectData() {
StatCollector collector = new StatCollector();
int nPoints = 100;
http://git-wip-us.apache.org/repos/asf/helix/blob/21296c95/helix-core/src/test/java/org/apache/helix/monitoring/TestZKPathDataDumpTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestZKPathDataDumpTask.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestZKPathDataDumpTask.java
new file mode 100644
index 0000000..a3d8ae3
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestZKPathDataDumpTask.java
@@ -0,0 +1,113 @@
+package org.apache.helix.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.StatusUpdate;
+import org.apache.helix.model.Error;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestZKPathDataDumpTask extends ZkUnitTestBase {
+
+ @Test
+ public void test() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ int n = 1;
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 2, // partitions per resource
+ n, // number of nodes
+ 1, // replicas
+ "MasterSlave", true); // do rebalance
+
+ HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ BaseDataAccessor<ZNRecord> baseAccessor = accessor.getBaseDataAccessor();
+
+ HelixManager manager = mock(HelixManager.class);
+ when(manager.getHelixDataAccessor()).thenReturn(accessor);
+ when(manager.getClusterName()).thenReturn(clusterName);
+
+ // run dump task without statusUpdates and errors, should not remove any existing statusUpdate/error paths
+ ZKPathDataDumpTask task = new ZKPathDataDumpTask(manager, 0);
+ task.run();
+ PropertyKey controllerStatusUpdateKey = keyBuilder.controllerTaskStatuses();
+ Assert.assertTrue(baseAccessor.exists(controllerStatusUpdateKey.getPath(), 0));
+ PropertyKey controllerErrorKey = keyBuilder.controllerTaskErrors();
+ Assert.assertTrue(baseAccessor.exists(controllerErrorKey.getPath(), 0));
+ PropertyKey statusUpdateKey = keyBuilder.stateTransitionStatus("localhost_12918");
+ Assert.assertTrue(baseAccessor.exists(statusUpdateKey.getPath(), 0));
+ PropertyKey errorKey = keyBuilder.stateTransitionErrors("localhost_12918");
+
+ // add participant status updates and errors
+ statusUpdateKey =
+ keyBuilder.stateTransitionStatus("localhost_12918", "session_0", "TestDB0", "TestDB0_0");
+ accessor.setProperty(statusUpdateKey, new StatusUpdate(new ZNRecord("statusUpdate")));
+ errorKey =
+ keyBuilder.stateTransitionError("localhost_12918", "session_0", "TestDB0", "TestDB0_0");
+ accessor.setProperty(errorKey, new Error(new ZNRecord("error")));
+
+ // add controller status updates and errors
+ controllerStatusUpdateKey = keyBuilder.controllerTaskStatus("session_0", "TestDB");
+ accessor.setProperty(controllerStatusUpdateKey, new StatusUpdate(new ZNRecord("controllerStatusUpdate")));
+ controllerErrorKey = keyBuilder.controllerTaskError("TestDB_error");
+ accessor.setProperty(controllerErrorKey, new Error(new ZNRecord("controllerError")));
+
+ // run dump task, should remove existing statusUpdate/error paths
+ task.run();
+ Assert.assertFalse(baseAccessor.exists(controllerStatusUpdateKey.getPath(), 0));
+ Assert.assertFalse(baseAccessor.exists(controllerErrorKey.getPath(), 0));
+ Assert.assertFalse(baseAccessor.exists(statusUpdateKey.getPath(), 0));
+ Assert.assertFalse(baseAccessor.exists(errorKey.getPath(), 0));
+
+ controllerStatusUpdateKey = keyBuilder.controllerTaskStatuses();
+ Assert.assertTrue(baseAccessor.exists(controllerStatusUpdateKey.getPath(), 0));
+ controllerErrorKey = keyBuilder.controllerTaskErrors();
+ Assert.assertTrue(baseAccessor.exists(controllerErrorKey.getPath(), 0));
+ statusUpdateKey = keyBuilder.stateTransitionStatus("localhost_12918");
+ Assert.assertTrue(baseAccessor.exists(statusUpdateKey.getPath(), 0));
+ errorKey = keyBuilder.stateTransitionErrors("localhost_12918");
+
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/21296c95/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAlertItemMBeanCollection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAlertItemMBeanCollection.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAlertItemMBeanCollection.java
index e383291..d2e5395 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAlertItemMBeanCollection.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAlertItemMBeanCollection.java
@@ -22,7 +22,6 @@ package org.apache.helix.monitoring.mbeans;
import java.io.IOException;
import java.io.StringReader;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import javax.management.AttributeNotFoundException;
@@ -32,14 +31,12 @@ import javax.management.MBeanException;
import javax.management.MalformedObjectNameException;
import javax.management.ReflectionException;
-import org.apache.helix.ZNRecord;
import org.apache.helix.alerts.AlertValueAndStatus;
import org.apache.helix.alerts.Tuple;
import org.apache.helix.healthcheck.TestWildcardAlert.TestClusterMBeanObserver;
import org.apache.helix.monitoring.mbeans.ClusterAlertMBeanCollection;
import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.type.TypeReference;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -47,7 +44,7 @@ public class TestClusterAlertItemMBeanCollection {
private static final Logger _logger = Logger.getLogger(TestClusterAlertItemMBeanCollection.class);
@Test
- public void TestAlertReportingHistory() throws InstanceNotFoundException,
+ public void testAlertReportingHistory() throws InstanceNotFoundException,
MalformedObjectNameException, NullPointerException, IOException, IntrospectionException,
AttributeNotFoundException, ReflectionException, MBeanException {
ClusterAlertMBeanCollection beanCollection = new ClusterAlertMBeanCollection();
@@ -226,7 +223,7 @@ public class TestClusterAlertItemMBeanCollection {
}
@Test
- public void TestAlertRefresh() throws InstanceNotFoundException, MalformedObjectNameException,
+ public void testAlertRefresh() throws InstanceNotFoundException, MalformedObjectNameException,
NullPointerException, IOException, IntrospectionException, AttributeNotFoundException,
ReflectionException, MBeanException, InterruptedException {
ClusterAlertMBeanCollection beanCollection = new ClusterAlertMBeanCollection();
http://git-wip-us.apache.org/repos/asf/helix/blob/21296c95/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b14e164..bbebf55 100644
--- a/pom.xml
+++ b/pom.xml
@@ -290,6 +290,11 @@ under the License.
<version>6.0.1</version>
</dependency>
<dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>1.9.5</version>
+ </dependency>
+ <dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.12</version>