You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2019/05/25 01:20:06 UTC
[helix] 32/44: Fix critical Task Framework throttle bug
This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
commit cd821cd620b6821cbe3d4ccf7072efeb3b924d32
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Thu May 9 15:59:59 2019 -0700
Fix critical Task Framework throttle bug
Task throttling feature had a logical bug where it wouldn't count any of the pending task assignments, which was breaking task throttling. This diff fixes it.
RB=1661127
BUG=HELIX-1875
G=helix-reviewers
A=jjwang
Signed-off-by: Hunter Lee <hu...@linkedin.com>
---
.../org/apache/helix/ClusterMessagingService.java | 17 +++----
.../stages/CurrentStateComputationStage.java | 5 ++
.../controller/stages/CurrentStateOutput.java | 35 +++++++++-----
.../integration/manager/TestZkHelixAdmin.java | 44 +++++++++++++++++-
.../spectator/TestRoutingTableSnapshot.java | 3 +-
.../helix/manager/zk/TestZNRecordSizeLimit.java | 53 +++++++++++-----------
.../TestClusterStatusMonitorLifecycle.java | 49 ++++----------------
7 files changed, 117 insertions(+), 89 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java b/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java
index 28188e0..96a5957 100644
--- a/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java
+++ b/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java
@@ -37,7 +37,7 @@ import org.apache.helix.model.Message;
public interface ClusterMessagingService {
/**
* Send message matching the specifications mentioned in recipientCriteria.
- * @param recipientCriteria criteria to be met, defined as {@link Criteria}
+ * @param receipientCriteria criteria to be met, defined as {@link Criteria}
* @See Criteria
* @param message
* message to be sent. Some attributes of this message will be
@@ -55,24 +55,24 @@ public interface ClusterMessagingService {
* This is useful when message need to be sent and current thread need not
* wait for response since processing will be done in another thread.
* @see #send(Criteria, Message)
- * @param recipientCriteria
+ * @param receipientCriteria
* @param message
* @param callbackOnReply callback to trigger on completion
* @param timeOut Time to wait before failing the send
* @return the number of messages that were successfully sent
*/
- int send(Criteria recipientCriteria, Message message, AsyncCallback callbackOnReply, int timeOut);
+ int send(Criteria receipientCriteria, Message message, AsyncCallback callbackOnReply, int timeOut);
/**
* @see #send(Criteria, Message, AsyncCallback, int)
- * @param recipientCriteria
+ * @param receipientCriteria
* @param message
* @param callbackOnReply
* @param timeOut
* @param retryCount maximum number of times to retry the send
* @return the number of messages that were successfully sent
*/
- int send(Criteria recipientCriteria, Message message, AsyncCallback callbackOnReply,
+ int send(Criteria receipientCriteria, Message message, AsyncCallback callbackOnReply,
int timeOut, int retryCount);
/**
@@ -86,13 +86,14 @@ public interface ClusterMessagingService {
* The current thread can use callbackOnReply instance to store application
* specific data.
* @see #send(Criteria, Message, AsyncCallback, int)
- * @param recipientCriteria
+ * @param receipientCriteria
* @param message
* @param callbackOnReply
* @param timeOut
+ * @param retryCount
* @return the number of messages that were successfully sent
*/
- int sendAndWait(Criteria recipientCriteria, Message message, AsyncCallback callbackOnReply,
+ int sendAndWait(Criteria receipientCriteria, Message message, AsyncCallback callbackOnReply,
int timeOut);
/**
@@ -143,7 +144,7 @@ public interface ClusterMessagingService {
/**
* This will generate all messages to be sent given the recipientCriteria and MessageTemplate,
* the messages are not sent.
- * @param recipientCriteria criteria to be met, defined as {@link Criteria}
+ * @param receipientCriteria criteria to be met, defined as {@link Criteria}
* @param messageTemplate the Message on which to base the messages to send
* @return messages to be sent, grouped by the type of instance to send the message to
*/
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 72d3688..0bf4d28 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -121,6 +121,11 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
}
}
}
+
+ // Add the state model into the map for lookup of Task Framework pending partitions
+ if (resource.getStateModelDefRef() != null) {
+ currentStateOutput.setResourceStateModelDef(resourceName, resource.getStateModelDefRef());
+ }
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
index b634703..13e1dbf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
@@ -110,8 +110,8 @@ public class CurrentStateOutput {
_currentStateMap.get(resourceName).get(partition).put(instanceName, state);
}
- public void setEndTime(String resourceName, Partition partition,
- String instanceName, Long timestamp) {
+ public void setEndTime(String resourceName, Partition partition, String instanceName,
+ Long timestamp) {
if (!_currentStateEndTimeMap.containsKey(resourceName)) {
_currentStateEndTimeMap.put(resourceName, new HashMap<Partition, Map<String, Long>>());
}
@@ -193,8 +193,7 @@ public class CurrentStateOutput {
return null;
}
- public Long getEndTime(String resourceName, Partition partition,
- String instanceName) {
+ public Long getEndTime(String resourceName, Partition partition, String instanceName) {
Map<Partition, Map<String, Long>> partitionInfo = _currentStateEndTimeMap.get(resourceName);
if (partitionInfo != null) {
Map<String, Long> instanceInfo = partitionInfo.get(partition);
@@ -279,7 +278,7 @@ public class CurrentStateOutput {
*/
public Map<Partition, Map<String, String>> getCurrentStateMap(String resourceName) {
if (_currentStateMap.containsKey(resourceName)) {
- return _currentStateMap.get(resourceName);
+ return _currentStateMap.get(resourceName);
}
return Collections.emptyMap();
}
@@ -356,32 +355,43 @@ public class CurrentStateOutput {
}
/**
- * Get the partitions count for each participant with the pending state and given resource state model
+ * Get the partitions count for each participant with the pending state and given resource state
+ * model
* @param resourceStateModel specified resource state model to look up
* @param state specified pending resource state to look up
* @return set of participants to partitions mapping
*/
- public Map<String, Integer> getPartitionCountWithPendingState(String resourceStateModel, String state) {
+ public Map<String, Integer> getPartitionCountWithPendingState(String resourceStateModel,
+ String state) {
return getPartitionCountWithState(resourceStateModel, state, (Map) _pendingMessageMap);
}
/**
- * Get the partitions count for each participant in the current state and with given resource state model
+ * Get the partitions count for each participant in the current state and with given resource
+ * state model
* @param resourceStateModel specified resource state model to look up
* @param state specified current resource state to look up
* @return set of participants to partitions mapping
*/
- public Map<String, Integer> getPartitionCountWithCurrentState(String resourceStateModel, String state) {
+ public Map<String, Integer> getPartitionCountWithCurrentState(String resourceStateModel,
+ String state) {
return getPartitionCountWithState(resourceStateModel, state, (Map) _currentStateMap);
}
+ /**
+ * Count partitions in pendingStates and currentStates.
+ * @param resourceStateModel
+ * @param state
+ * @param stateMap
+ * @return
+ */
private Map<String, Integer> getPartitionCountWithState(String resourceStateModel, String state,
Map<String, Map<Partition, Map<String, Object>>> stateMap) {
Map<String, Integer> currentPartitionCount = new HashMap<>();
for (String resource : stateMap.keySet()) {
String stateModel = _resourceStateModelMap.get(resource);
- if ((stateModel != null && stateModel.equals(resourceStateModel)) || (stateModel == null
- && resourceStateModel == null)) {
+ if ((stateModel != null && stateModel.equals(resourceStateModel))
+ || (stateModel == null && resourceStateModel == null)) {
for (Partition partition : stateMap.get(resource).keySet()) {
Map<String, Object> partitionMessage = stateMap.get(resource).get(partition);
for (Map.Entry<String, Object> participantMap : partitionMessage.entrySet()) {
@@ -399,7 +409,8 @@ public class CurrentStateOutput {
currState = curStateObj.toString();
}
}
- if ((currState != null && currState.equals(state)) || (currState == null && state == null)) {
+ if ((currState != null && currState.equals(state))
+ || (currState == null && state == null)) {
currentPartitionCount.put(participant, currentPartitionCount.get(participant) + 1);
}
}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java
index 5141a8d..0dfdfb4 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java
@@ -19,16 +19,20 @@ package org.apache.helix.integration.manager;
* under the License.
*/
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
+import org.apache.helix.PropertyType;
import org.apache.helix.TestHelper;
+import org.apache.helix.api.config.ViewClusterSourceConfig;
import org.apache.helix.integration.task.MockTask;
import org.apache.helix.integration.task.TaskTestBase;
import org.apache.helix.integration.task.WorkflowGenerator;
import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
@@ -57,6 +61,26 @@ public class TestZkHelixAdmin extends TaskTestBase {
}
@Test
+ public void testViewClusterOperations() {
+ String testCluster = "testViewCluster";
+ List<ViewClusterSourceConfig> sourceConfigs = generateViewClusterSourceConfig();
+ int refreshPeriod = 10;
+
+ _admin.addCluster(testCluster);
+ ClusterConfig config = _configAccessor.getClusterConfig(testCluster);
+ config.setViewCluster();
+ config.setViewClusterRefreshPeriod(refreshPeriod);
+ config.setViewClusterSourceConfigs(sourceConfigs);
+ _configAccessor.setClusterConfig(testCluster, config);
+
+ ClusterConfig fetchedConfig = _configAccessor.getClusterConfig(testCluster);
+ Assert.assertTrue(fetchedConfig.isViewCluster());
+ Assert.assertEquals(fetchedConfig.getViewClusterSourceConfigs().size(), sourceConfigs.size());
+ Assert.assertEquals(fetchedConfig.getViewClusterRefershPeriod(), refreshPeriod);
+ _admin.dropCluster(testCluster);
+ }
+
+ @Test
public void testEnableDisablePartitions() throws InterruptedException {
_admin.enablePartition(false, CLUSTER_NAME, (PARTICIPANT_PREFIX + "_" + _startPort),
WorkflowGenerator.DEFAULT_TGT_DB, Arrays.asList(new String[] { "TestDB_0", "TestDB_2" }));
@@ -88,4 +112,22 @@ public class TestZkHelixAdmin extends TaskTestBase {
Assert.assertEquals(jobContext.getPartitionState(1), TaskPartitionState.COMPLETED);
Assert.assertEquals(jobContext.getPartitionState(2), null);
}
-}
\ No newline at end of file
+
+ private List<ViewClusterSourceConfig> generateViewClusterSourceConfig() {
+ String clusterNamePrefix = "mySourceCluster";
+ String zkConnection = "zookeeper.test.com:2121";
+ String testJsonTemplate =
+ "{\"name\": \"%s\", \"zkAddress\": \"%s\", \"properties\": [\"%s\", \"%s\", \"%s\"]}";
+
+ List<ViewClusterSourceConfig> sourceConfigs = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ String clusterName = clusterNamePrefix + i;
+ String configJSON = String
+ .format(testJsonTemplate, clusterName, zkConnection, PropertyType.INSTANCES.name(),
+ PropertyType.EXTERNALVIEW.name(), PropertyType.LIVEINSTANCES.name());
+
+ sourceConfigs.add(ViewClusterSourceConfig.fromJson(configJSON));
+ }
+ return sourceConfigs;
+ }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableSnapshot.java b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableSnapshot.java
index 3b498c3..216c900 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableSnapshot.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableSnapshot.java
@@ -132,4 +132,5 @@ public class TestRoutingTableSnapshot extends ZkTestBase {
Assert.assertEquals(slaveInsEv.size(), 2);
}
}
-}
\ No newline at end of file
+}
+
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
index bccb425..36e26e7 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
@@ -81,23 +81,25 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
_gZkClient.createPersistent(path2, true);
try {
_gZkClient.writeData(path2, largeRecord);
+ Assert.fail("Should fail because data size is larger than 1M");
} catch (HelixException e) {
- Assert.fail("Should not fail because data size is larger than 1M since compression applied");
+ // OK
}
record = _gZkClient.readData(path2);
- Assert.assertNotNull(record);
+ Assert.assertNull(record);
// oversized write doesn't overwrite existing data on zk
record = _gZkClient.readData(path1);
try {
_gZkClient.writeData(path1, largeRecord);
+ Assert.fail("Should fail because data size is larger than 1M");
} catch (HelixException e) {
- Assert.fail("Should not fail because data size is larger than 1M since compression applied");
+ // OK
}
ZNRecord recordNew = _gZkClient.readData(path1);
byte[] arr = serializer.serialize(record);
byte[] arrNew = serializer.serialize(recordNew);
- Assert.assertFalse(Arrays.equals(arr, arrNew));
+ Assert.assertTrue(Arrays.equals(arr, arrNew));
// test ZkDataAccessor
ZKHelixAdmin admin = new ZKHelixAdmin(_gZkClient);
@@ -119,7 +121,7 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
idealState.getRecord().setSimpleField(i + "", bufStr);
}
boolean succeed = accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
- Assert.assertTrue(succeed);
+ Assert.assertFalse(succeed);
HelixProperty property =
accessor.getProperty(keyBuilder.stateTransitionStatus("localhost_12918", "session_1",
"partition_1"));
@@ -149,11 +151,11 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
}
// System.out.println("record: " + idealState.getRecord());
succeed = accessor.updateProperty(keyBuilder.idealStates("TestDB1"), idealState);
- Assert.assertTrue(succeed);
+ Assert.assertFalse(succeed);
recordNew = accessor.getProperty(keyBuilder.idealStates("TestDB1")).getRecord();
arr = serializer.serialize(record);
arrNew = serializer.serialize(recordNew);
- Assert.assertFalse(Arrays.equals(arr, arrNew));
+ Assert.assertTrue(Arrays.equals(arr, arrNew));
System.out.println("END testZNRecordSizeLimitUseZNRecordSerializer at "
+ new Date(System.currentTimeMillis()));
@@ -162,12 +164,12 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
@Test
public void testZNRecordSizeLimitUseZNRecordStreamingSerializer() {
String className = getShortClassName();
- System.out.println("START testZNRecordSizeLimitUseZNRecordStreamingSerializer at " + new Date(
- System.currentTimeMillis()));
+ System.out.println("START testZNRecordSizeLimitUseZNRecordStreamingSerializer at "
+ + new Date(System.currentTimeMillis()));
ZNRecordStreamingSerializer serializer = new ZNRecordStreamingSerializer();
- HelixZkClient zkClient = SharedZkClientFactory.getInstance()
- .buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
+ HelixZkClient zkClient =
+ SharedZkClientFactory.getInstance().buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
try {
zkClient.setZkSerializer(serializer);
@@ -205,25 +207,25 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
zkClient.createPersistent(path2, true);
try {
zkClient.writeData(path2, largeRecord);
+ Assert.fail("Should fail because data size is larger than 1M");
} catch (HelixException e) {
- Assert
- .fail("Should not fail because data size is larger than 1M since compression applied");
+ // OK
}
record = zkClient.readData(path2);
- Assert.assertNotNull(record);
+ Assert.assertNull(record);
// oversized write doesn't overwrite existing data on zk
record = zkClient.readData(path1);
try {
zkClient.writeData(path1, largeRecord);
+ Assert.fail("Should fail because data size is larger than 1M");
} catch (HelixException e) {
- Assert
- .fail("Should not fail because data size is larger than 1M since compression applied");
+ // OK
}
ZNRecord recordNew = zkClient.readData(path1);
byte[] arr = serializer.serialize(record);
byte[] arrNew = serializer.serialize(recordNew);
- Assert.assertFalse(Arrays.equals(arr, arrNew));
+ Assert.assertTrue(Arrays.equals(arr, arrNew));
// test ZkDataAccessor
ZKHelixAdmin admin = new ZKHelixAdmin(zkClient);
@@ -232,8 +234,7 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
admin.addInstance(className, instanceConfig);
// oversized data should not create any new data on zk
- ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(className, new ZkBaseDataAccessor(zkClient));
+ ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(className, new ZkBaseDataAccessor(zkClient));
Builder keyBuilder = accessor.keyBuilder();
// ZNRecord statusUpdates = new ZNRecord("statusUpdates");
@@ -246,9 +247,9 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
idealState.getRecord().setSimpleField(i + "", bufStr);
}
boolean succeed = accessor.setProperty(keyBuilder.idealStates("TestDB_1"), idealState);
- Assert.assertTrue(succeed);
+ Assert.assertFalse(succeed);
HelixProperty property = accessor.getProperty(keyBuilder.idealStates("TestDB_1"));
- Assert.assertNotNull(property);
+ Assert.assertNull(property);
// legal sized data gets written to zk
idealState.getRecord().getSimpleFields().clear();
@@ -275,18 +276,16 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
}
// System.out.println("record: " + idealState.getRecord());
succeed = accessor.updateProperty(keyBuilder.idealStates("TestDB_2"), idealState);
- Assert.assertTrue(succeed);
+ Assert.assertFalse(succeed);
recordNew = accessor.getProperty(keyBuilder.idealStates("TestDB_2")).getRecord();
arr = serializer.serialize(record);
arrNew = serializer.serialize(recordNew);
- Assert.assertFalse(Arrays.equals(arr, arrNew));
- } catch (HelixException ex) {
- Assert.fail("Should not fail because data size is larger than 1M since compression applied");
+ Assert.assertTrue(Arrays.equals(arr, arrNew));
} finally {
zkClient.close();
}
- System.out.println("END testZNRecordSizeLimitUseZNRecordStreamingSerializer at " + new Date(
- System.currentTimeMillis()));
+ System.out.println("END testZNRecordSizeLimitUseZNRecordStreamingSerializer at "
+ + new Date(System.currentTimeMillis()));
}
}
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
index 6156666..f84faf5 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
@@ -19,26 +19,21 @@ package org.apache.helix.monitoring;
* under the License.
*/
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.Set;
-import javax.management.InstanceNotFoundException;
-import javax.management.MBeanServerConnection;
-import javax.management.MBeanServerNotification;
-import javax.management.MalformedObjectNameException;
import javax.management.ObjectInstance;
import javax.management.ObjectName;
import javax.management.Query;
import javax.management.QueryExp;
+import java.lang.management.ManagementFactory;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Set;
+
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.TestHelper;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.integration.manager.ClusterDistributedController;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.model.IdealState;
-import org.apache.helix.monitoring.mbeans.ClusterMBeanObserver;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
@@ -67,8 +62,8 @@ public class TestClusterStatusMonitorLifecycle extends ZkTestBase {
String className = TestHelper.getTestClassName();
_clusterNamePrefix = className;
- System.out
- .println("START " + _clusterNamePrefix + " at " + new Date(System.currentTimeMillis()));
+ System.out.println("START " + _clusterNamePrefix + " at "
+ + new Date(System.currentTimeMillis()));
// setup 10 clusters
for (int i = 0; i < clusterNb; i++) {
@@ -97,7 +92,8 @@ public class TestClusterStatusMonitorLifecycle extends ZkTestBase {
clusterNb, // partitions per resource
n, // number of nodes
3, // replicas
- "LeaderStandby", true); // do rebalance
+ "LeaderStandby",
+ true); // do rebalance
// start distributed cluster controllers
_controllers = new ClusterDistributedController[n + n];
@@ -177,32 +173,6 @@ public class TestClusterStatusMonitorLifecycle extends ZkTestBase {
System.out.println("END " + _clusterNamePrefix + " at " + new Date(System.currentTimeMillis()));
}
- class ParticipantMonitorListener extends ClusterMBeanObserver {
-
- int _nMbeansUnregistered = 0;
- int _nMbeansRegistered = 0;
-
- public ParticipantMonitorListener(String domain)
- throws InstanceNotFoundException, IOException, MalformedObjectNameException,
- NullPointerException {
- super(domain);
- }
-
- @Override
- public void onMBeanRegistered(MBeanServerConnection server,
- MBeanServerNotification mbsNotification) {
- LOG.info("Register mbean: " + mbsNotification.getMBeanName());
- _nMbeansRegistered++;
- }
-
- @Override
- public void onMBeanUnRegistered(MBeanServerConnection server,
- MBeanServerNotification mbsNotification) {
- LOG.info("Unregister mbean: " + mbsNotification.getMBeanName());
- _nMbeansUnregistered++;
- }
- }
-
private void cleanupControllers() {
for (int i = 0; i < _controllers.length; i++) {
if (_controllers[i] != null && _controllers[i].isConnected()) {
@@ -326,7 +296,6 @@ public class TestClusterStatusMonitorLifecycle extends ZkTestBase {
cleanupControllers();
// Check if any MBeans leftover.
// Note that MessageQueueStatus is not bound with controller only. So it will still exist.
-
final QueryExp exp2 = Query.and(
Query.not(Query.match(Query.attr("SensorName"), Query.value("MessageQueueStatus.*"))),
exp1);