You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2018/11/16 22:43:23 UTC
[geode] 07/11: GEODE-2644: Expand DistributedSystemMXBean tests
This is an automated email from the ASF dual-hosted git repository.
klund pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 6709a326e2d16661e34c179cb81b6a46209b2812
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Fri Nov 2 16:49:49 2018 -0700
GEODE-2644: Expand DistributedSystemMXBean tests
Extract these tests from DistributedSystemDUnitTest:
* DistributedSystemMXBeanDistributedTest
* DistributedSystemMXBeanWithAlertsDistributedTest
* DistributedSystemMXBeanWithNotificationsDistributedTest
Add new DistributedSystemMXBeanIntegrationTest.
Add many new Alert notification tests.
---
.../management/DistributedSystemDUnitTest.java | 575 ---------------------
.../DistributedSystemMXBeanDistributedTest.java | 202 ++++++++
...butedSystemMXBeanWithAlertsDistributedTest.java | 495 ++++++++++++++++++
...stemMXBeanWithNotificationsDistributedTest.java | 243 +++++++++
.../DistributedSystemMXBeanIntegrationTest.java | 93 +++-
5 files changed, 1007 insertions(+), 601 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/DistributedSystemDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/DistributedSystemDUnitTest.java
deleted file mode 100644
index 8fae2ea..0000000
--- a/geode-core/src/distributedTest/java/org/apache/geode/management/DistributedSystemDUnitTest.java
+++ /dev/null
@@ -1,575 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.management;
-
-import static java.lang.management.ManagementFactory.getPlatformMBeanServer;
-import static org.apache.geode.management.internal.MBeanJMXAdapter.getDistributedSystemName;
-import static org.apache.geode.management.internal.MBeanJMXAdapter.getMemberMBeanName;
-import static org.apache.geode.management.internal.MBeanJMXAdapter.getMemberNameOrId;
-import static org.apache.geode.test.dunit.Host.getHost;
-import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
-import static org.apache.geode.test.dunit.Invoke.invokeInEveryVM;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.management.ListenerNotFoundException;
-import javax.management.Notification;
-import javax.management.NotificationBroadcasterSupport;
-import javax.management.NotificationFilter;
-import javax.management.NotificationListener;
-import javax.management.ObjectName;
-
-import org.apache.logging.log4j.Logger;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-
-import org.apache.geode.distributed.DistributedMember;
-import org.apache.geode.internal.admin.Alert;
-import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.internal.logging.log4j.AlertAppender;
-import org.apache.geode.management.internal.AlertDetails;
-import org.apache.geode.management.internal.ManagementConstants;
-import org.apache.geode.management.internal.NotificationHub;
-import org.apache.geode.management.internal.NotificationHub.NotificationHubListener;
-import org.apache.geode.management.internal.SystemManagementService;
-import org.apache.geode.management.internal.beans.MemberMBean;
-import org.apache.geode.management.internal.beans.SequenceNumber;
-import org.apache.geode.test.awaitility.GeodeAwaitility;
-import org.apache.geode.test.dunit.IgnoredException;
-import org.apache.geode.test.dunit.VM;
-
-/**
- * DistributedSystemMXBean tests:
- *
- * <p>
- * a) For all the notifications
- * <ul>
- * <li>i) gemfire.distributedsystem.member.joined
- * <li>ii) gemfire.distributedsystem.member.left
- * <li>iii) gemfire.distributedsystem.member.suspect
- * <li>iv) All notifications emitted by member mbeans
- * <li>v) Alerts
- * </ul>
- *
- * <p>
- * b) Concurrently modify proxy list by removing member and accessing the distributed system MBean
- *
- * <p>
- * c) Aggregate Operations like shutDownAll
- *
- * <p>
- * d) Member level operations like fetchJVMMetrics()
- *
- * <p>
- * e ) Statistics
- *
- * <p>
- * TODO: break up the large tests into smaller tests
- */
-@SuppressWarnings("serial,unused")
-public class DistributedSystemDUnitTest implements Serializable {
-
- private static final Logger logger = LogService.getLogger();
-
- private static final String WARNING_LEVEL_MESSAGE = "Warning Level Alert Message";
- private static final String SEVERE_LEVEL_MESSAGE = "Severe Level Alert Message";
-
- private static volatile List<Notification> notifications;
- private static volatile Map<ObjectName, NotificationListener> notificationListenerMap;
-
- @Manager
- private VM managerVM;
-
- @Member
- private VM[] memberVMs;
-
- @Rule
- public ManagementTestRule managementTestRule = ManagementTestRule.builder().build();
-
- @Before
- public void before() throws Exception {
- notifications = Collections.synchronizedList(new ArrayList<>());
- notificationListenerMap = Collections.synchronizedMap(new HashMap<>());
- invokeInEveryVM(() -> notifications = Collections.synchronizedList(new ArrayList<>()));
- invokeInEveryVM(() -> notificationListenerMap = Collections.synchronizedMap(new HashMap<>()));
- }
-
- @After
- public void after() {
- resetAlertCounts(managerVM);
-
- notifications = null;
- notificationListenerMap = null;
- invokeInEveryVM(() -> notifications = null);
- invokeInEveryVM(() -> notificationListenerMap = null);
- }
-
- /**
- * Tests each and every operations that is defined on the MemberMXBean
- */
- @Test
- public void testDistributedSystemAggregate() {
- managementTestRule.createManager(managerVM);
- addNotificationListener(managerVM);
-
- for (VM memberVM : memberVMs) {
- managementTestRule.createMember(memberVM);
- }
-
- verifyDistributedSystemMXBean(managerVM);
- }
-
- /**
- * Tests each and every operations that is defined on the MemberMXBean
- */
- @Test
- public void testAlertManagedNodeFirst() {
- for (VM memberVM : memberVMs) {
- managementTestRule.createMember(memberVM);
- generateWarningAlert(memberVM);
- generateSevereAlert(memberVM);
- }
-
- managementTestRule.createManager(managerVM);
- addAlertListener(managerVM);
- verifyAlertCount(managerVM, 0, 0);
-
- DistributedMember managerDistributedMember =
- managementTestRule.getDistributedMember(managerVM);
-
- // Before we start we need to ensure that the initial (implicit) SEVERE alert has propagated
- // everywhere.
- for (VM memberVM : memberVMs) {
- verifyAlertAppender(memberVM, managerDistributedMember, Alert.SEVERE);
- }
-
- setAlertLevel(managerVM, AlertDetails.getAlertLevelAsString(Alert.WARNING));
-
- for (VM memberVM : memberVMs) {
- verifyAlertAppender(memberVM, managerDistributedMember, Alert.WARNING);
- generateWarningAlert(memberVM);
- generateSevereAlert(memberVM);
- }
-
- verifyAlertCount(managerVM, 3, 3);
- resetAlertCounts(managerVM);
-
- setAlertLevel(managerVM, AlertDetails.getAlertLevelAsString(Alert.SEVERE));
-
- for (VM memberVM : memberVMs) {
- verifyAlertAppender(memberVM, managerDistributedMember, Alert.SEVERE);
- generateWarningAlert(memberVM);
- generateSevereAlert(memberVM);
- }
-
- verifyAlertCount(managerVM, 3, 0);
- }
-
- /**
- * Tests each and every operations that is defined on the MemberMXBean
- */
- @Test
- public void testShutdownAll() {
- VM memberVM1 = getHost(0).getVM(0);
- VM memberVM2 = getHost(0).getVM(1);
- VM memberVM3 = getHost(0).getVM(2);
-
- VM managerVM = getHost(0).getVM(3);
-
- // managerVM Node is created first
- managementTestRule.createManager(managerVM);
-
- managementTestRule.createMember(memberVM1);
- managementTestRule.createMember(memberVM2);
- managementTestRule.createMember(memberVM3);
-
- shutDownAll(managerVM);
- }
-
- @Test
- public void testNavigationAPIS() {
- managementTestRule.createManager(managerVM);
-
- for (VM memberVM : memberVMs) {
- managementTestRule.createMember(memberVM);
- }
-
- verifyFetchMemberObjectName(managerVM, memberVMs.length + 1);
- }
-
- @Test
- public void testNotificationHub() {
- managementTestRule.createMembers();
- managementTestRule.createManagers();
-
- class NotificationHubTestListener implements NotificationListener {
-
- @Override
- public synchronized void handleNotification(Notification notification, Object handback) {
- logger.info("Notification received {}", notification);
- notifications.add(notification);
- }
- }
-
- managerVM.invoke("addListenerToMemberMXBean", () -> {
- ManagementService service = managementTestRule.getManagementService();
- DistributedSystemMXBean distributedSystemMXBean = service.getDistributedSystemMXBean();
-
- GeodeAwaitility.await().untilAsserted(
- () -> assertThat(distributedSystemMXBean.listMemberObjectNames()).hasSize(5));
-
- for (ObjectName objectName : distributedSystemMXBean.listMemberObjectNames()) {
- NotificationHubTestListener listener = new NotificationHubTestListener();
- getPlatformMBeanServer().addNotificationListener(objectName, listener, null, null);
- notificationListenerMap.put(objectName, listener);
- }
- });
-
- // Check in all VMS
-
- for (VM memberVM : memberVMs) {
- memberVM.invoke("checkNotificationHubListenerCount", () -> {
- SystemManagementService service = managementTestRule.getSystemManagementService();
- NotificationHub notificationHub = service.getNotificationHub();
- Map<ObjectName, NotificationHubListener> listenerMap =
- notificationHub.getListenerObjectMap();
- assertThat(listenerMap.keySet()).hasSize(1);
-
- ObjectName memberMBeanName =
- getMemberMBeanName(managementTestRule.getDistributedMember());
- NotificationHubListener listener = listenerMap.get(memberMBeanName);
-
- /*
- * Counter of listener should be 2 . One for default Listener which is added for each member
- * mbean by distributed system mbean One for the added listener in test
- */
- assertThat(listener.getNumCounter()).isEqualTo(2);
-
- // Raise some notifications
-
- NotificationBroadcasterSupport notifier = (MemberMBean) service.getMemberMXBean();
- String memberSource = getMemberNameOrId(managementTestRule.getDistributedMember());
-
- // Only a dummy notification , no actual region is created
- Notification notification = new Notification(JMXNotificationType.REGION_CREATED,
- memberSource, SequenceNumber.next(), System.currentTimeMillis(),
- ManagementConstants.REGION_CREATED_PREFIX + "/test");
- notifier.sendNotification(notification);
- });
- }
-
- managerVM.invoke("checkNotificationsAndRemoveListeners", () -> {
- GeodeAwaitility.await().untilAsserted(() -> assertThat(notifications).hasSize(3));
-
- notifications.clear();
-
- for (ObjectName objectName : notificationListenerMap.keySet()) {
- NotificationListener listener = notificationListenerMap.get(objectName);
- getPlatformMBeanServer().removeNotificationListener(objectName, listener);
- }
- });
-
- // Check in all VMS again
-
- for (VM memberVM : memberVMs) {
- memberVM.invoke("checkNotificationHubListenerCountAgain", () -> {
- SystemManagementService service = managementTestRule.getSystemManagementService();
- NotificationHub hub = service.getNotificationHub();
- Map<ObjectName, NotificationHubListener> listenerObjectMap = hub.getListenerObjectMap();
- assertThat(listenerObjectMap.keySet().size()).isEqualTo(1);
-
- ObjectName memberMBeanName =
- getMemberMBeanName(managementTestRule.getDistributedMember());
- NotificationHubListener listener = listenerObjectMap.get(memberMBeanName);
-
- /*
- * Counter of listener should be 1 for the default Listener which is added for each member
- * mbean by distributed system mbean.
- */
- assertThat(listener.getNumCounter()).isEqualTo(1);
- });
- }
-
- managerVM.invoke("removeListenerFromMemberMXBean", () -> {
- ManagementService service = managementTestRule.getManagementService();
- DistributedSystemMXBean distributedSystemMXBean = service.getDistributedSystemMXBean();
-
- GeodeAwaitility.await().untilAsserted(
- () -> assertThat(distributedSystemMXBean.listMemberObjectNames()).hasSize(5));
-
- for (ObjectName objectName : distributedSystemMXBean.listMemberObjectNames()) {
- NotificationHubTestListener listener = new NotificationHubTestListener();
- try {
- getPlatformMBeanServer().removeNotificationListener(objectName, listener); // because new
- // instance!!
- } catch (ListenerNotFoundException e) {
- // TODO: [old] apparently there is never a notification listener on any these mbeans at
- // this point [fix this]
- // fix this test so it doesn't hit these unexpected exceptions -- getLogWriter().error(e);
- }
- }
- });
-
- for (VM memberVM : memberVMs) {
- memberVM.invoke("verifyNotificationHubListenersWereRemoved", () -> {
- SystemManagementService service = managementTestRule.getSystemManagementService();
- NotificationHub notificationHub = service.getNotificationHub();
- notificationHub.cleanUpListeners();
- assertThat(notificationHub.getListenerObjectMap()).isEmpty();
-
- for (ObjectName objectName : notificationListenerMap.keySet()) {
- NotificationListener listener = notificationListenerMap.get(objectName);
- assertThatThrownBy(
- () -> getPlatformMBeanServer().removeNotificationListener(objectName, listener))
- .isExactlyInstanceOf(ListenerNotFoundException.class);
- }
- });
- }
- }
-
- /**
- * Tests each and every operations that is defined on the MemberMXBean
- */
- @Test
- public void testAlert() {
- managementTestRule.createManager(managerVM);
- addAlertListener(managerVM);
- resetAlertCounts(managerVM);
-
- DistributedMember managerDistributedMember =
- managementTestRule.getDistributedMember(managerVM);
-
- generateWarningAlert(managerVM);
- generateSevereAlert(managerVM);
- verifyAlertCount(managerVM, 1, 0);
- resetAlertCounts(managerVM);
-
- for (VM memberVM : memberVMs) {
- managementTestRule.createMember(memberVM);
-
- verifyAlertAppender(memberVM, managerDistributedMember, Alert.SEVERE);
-
- generateWarningAlert(memberVM);
- generateSevereAlert(memberVM);
- }
-
- verifyAlertCount(managerVM, 3, 0);
- resetAlertCounts(managerVM);
- setAlertLevel(managerVM, AlertDetails.getAlertLevelAsString(Alert.WARNING));
-
- for (VM memberVM : memberVMs) {
- verifyAlertAppender(memberVM, managerDistributedMember, Alert.WARNING);
- generateWarningAlert(memberVM);
- generateSevereAlert(memberVM);
- }
-
- verifyAlertCount(managerVM, 3, 3);
-
- resetAlertCounts(managerVM);
-
- setAlertLevel(managerVM, AlertDetails.getAlertLevelAsString(Alert.OFF));
-
- for (VM memberVM : memberVMs) {
- verifyAlertAppender(memberVM, managerDistributedMember, Alert.OFF);
- generateWarningAlert(memberVM);
- generateSevereAlert(memberVM);
- }
-
- verifyAlertCount(managerVM, 0, 0);
- }
-
- private void verifyAlertAppender(final VM memberVM, final DistributedMember member,
- final int alertLevel) {
- memberVM.invoke("verifyAlertAppender",
- () -> GeodeAwaitility.await().untilAsserted(
- () -> assertThat(AlertAppender.getInstance().hasAlertListener(member, alertLevel))
- .isTrue()));
- }
-
- private void verifyAlertCount(final VM managerVM, final int expectedSevereAlertCount,
- final int expectedWarningAlertCount) {
- managerVM.invoke("verifyAlertCount", () -> {
- AlertNotificationListener listener = AlertNotificationListener.getInstance();
-
- GeodeAwaitility.await().untilAsserted(
- () -> assertThat(listener.getSevereAlertCount()).isEqualTo(expectedSevereAlertCount));
- GeodeAwaitility.await().untilAsserted(
- () -> assertThat(listener.getWarningAlertCount()).isEqualTo(expectedWarningAlertCount));
- });
- }
-
- private void setAlertLevel(final VM managerVM, final String alertLevel) {
- managerVM.invoke("setAlertLevel", () -> {
- ManagementService service = managementTestRule.getManagementService();
- DistributedSystemMXBean distributedSystemMXBean = service.getDistributedSystemMXBean();
- distributedSystemMXBean.changeAlertLevel(alertLevel);
- });
- }
-
- private void generateWarningAlert(final VM anyVM) {
- anyVM.invoke("generateWarningAlert", () -> {
- IgnoredException ignoredException = addIgnoredException(WARNING_LEVEL_MESSAGE);
- logger.warn(WARNING_LEVEL_MESSAGE);
- ignoredException.remove();
- });
- }
-
- private void resetAlertCounts(final VM managerVM) {
- managerVM.invoke("resetAlertCounts", () -> {
- AlertNotificationListener listener = AlertNotificationListener.getInstance();
- listener.resetCount();
- });
- }
-
- private void generateSevereAlert(final VM anyVM) {
- anyVM.invoke("generateSevereAlert", () -> {
- IgnoredException ignoredException = addIgnoredException(SEVERE_LEVEL_MESSAGE);
- logger.fatal(SEVERE_LEVEL_MESSAGE);
- ignoredException.remove();
- });
- }
-
- private void addAlertListener(final VM managerVM) {
- managerVM.invoke("addAlertListener", () -> {
- AlertNotificationListener listener = AlertNotificationListener.getInstance();
- listener.resetCount();
-
- NotificationFilter notificationFilter = (Notification notification) -> notification.getType()
- .equals(JMXNotificationType.SYSTEM_ALERT);
-
- getPlatformMBeanServer().addNotificationListener(getDistributedSystemName(), listener,
- notificationFilter, null);
- });
- }
-
- /**
- * Check aggregate related functions and attributes
- */
- private void verifyDistributedSystemMXBean(final VM managerVM) {
- managerVM.invoke("verifyDistributedSystemMXBean", () -> {
- ManagementService service = managementTestRule.getManagementService();
- DistributedSystemMXBean distributedSystemMXBean = service.getDistributedSystemMXBean();
-
- GeodeAwaitility.await()
- .untilAsserted(() -> assertThat(distributedSystemMXBean.getMemberCount()).isEqualTo(5));
-
- Set<DistributedMember> otherMemberSet = managementTestRule.getOtherNormalMembers();
- for (DistributedMember member : otherMemberSet) {
- // TODO: create some assertions (this used to just print JVMMetrics and OSMetrics)
- }
- });
- }
-
- private void addNotificationListener(final VM managerVM) {
- managerVM.invoke("addNotificationListener", () -> {
- ManagementService service = managementTestRule.getManagementService();
- DistributedSystemMXBean distributedSystemMXBean = service.getDistributedSystemMXBean();
- assertThat(distributedSystemMXBean).isNotNull();
-
- DistributedSystemNotificationListener listener = new DistributedSystemNotificationListener();
- getPlatformMBeanServer().addNotificationListener(getDistributedSystemName(), listener, null,
- null);
- });
- }
-
- private void shutDownAll(final VM managerVM) {
- managerVM.invoke("shutDownAll", () -> {
- ManagementService service = managementTestRule.getManagementService();
- DistributedSystemMXBean distributedSystemMXBean = service.getDistributedSystemMXBean();
- distributedSystemMXBean.shutDownAllMembers();
-
- GeodeAwaitility.await().untilAsserted(
- () -> assertThat(managementTestRule.getOtherNormalMembers()).hasSize(0));
- });
- }
-
- private void verifyFetchMemberObjectName(final VM managerVM, final int memberCount) {
- managerVM.invoke("verifyFetchMemberObjectName", () -> {
- ManagementService service = managementTestRule.getManagementService();
- DistributedSystemMXBean distributedSystemMXBean = service.getDistributedSystemMXBean();
-
- GeodeAwaitility.await().untilAsserted(
- () -> assertThat(distributedSystemMXBean.listMemberObjectNames()).hasSize(memberCount));
-
- String memberId = managementTestRule.getDistributedMember().getId();
- ObjectName thisMemberName = getMemberMBeanName(memberId);
- ObjectName memberName = distributedSystemMXBean.fetchMemberObjectName(memberId);
- assertThat(memberName).isEqualTo(thisMemberName);
- });
- }
-
- private static class DistributedSystemNotificationListener implements NotificationListener {
-
- @Override
- public void handleNotification(final Notification notification, final Object handback) {
- assertThat(notification).isNotNull();
- }
- }
-
- private static class AlertNotificationListener implements NotificationListener {
-
- private static AlertNotificationListener listener = new AlertNotificationListener();
-
- private int warningAlertCount = 0;
-
- private int severeAlertCount = 0;
-
- static AlertNotificationListener getInstance() { // TODO: get rid of singleton
- return listener;
- }
-
- @Override
- public synchronized void handleNotification(final Notification notification,
- final Object handback) {
- assertThat(notification).isNotNull();
-
- Map<String, String> notificationUserData = (Map<String, String>) notification.getUserData();
-
- if (notificationUserData.get(JMXNotificationUserData.ALERT_LEVEL)
- .equalsIgnoreCase("warning")) {
- assertThat(notification.getMessage()).isEqualTo(WARNING_LEVEL_MESSAGE);
- warningAlertCount++;
- }
- if (notificationUserData.get(JMXNotificationUserData.ALERT_LEVEL)
- .equalsIgnoreCase("severe")) {
- assertThat(notification.getMessage()).isEqualTo(SEVERE_LEVEL_MESSAGE);
- severeAlertCount++;
- }
- }
-
- void resetCount() {
- warningAlertCount = 0;
- severeAlertCount = 0;
- }
-
- int getWarningAlertCount() {
- return warningAlertCount;
- }
-
- int getSevereAlertCount() {
- return severeAlertCount;
- }
- }
-}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/DistributedSystemMXBeanDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/DistributedSystemMXBeanDistributedTest.java
new file mode 100644
index 0000000..21d99bf
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/management/DistributedSystemMXBeanDistributedTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management;
+
+import static org.apache.geode.distributed.ConfigurationProperties.HTTP_SERVICE_PORT;
+import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER;
+import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_PORT;
+import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_START;
+import static org.apache.geode.distributed.ConfigurationProperties.NAME;
+import static org.apache.geode.management.internal.MBeanJMXAdapter.getMemberMBeanName;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.apache.geode.test.dunit.VM.toArray;
+import static org.apache.geode.test.dunit.standalone.DUnitLauncher.getDistributedSystemProperties;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.Set;
+
+import javax.management.ObjectName;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.management.internal.SystemManagementService;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.dunit.rules.SharedErrorCollector;
+import org.apache.geode.test.junit.categories.ManagementTest;
+
+/**
+ * Distributed tests for {@link DistributedSystemMXBean} notifications.
+ *
+ * <pre>
+ * a) Concurrently modify proxy list by removing member and accessing the distributed system MBean
+ * b) Aggregate Operations like shutDownAll
+ * c) Member level operations like fetchJVMMetrics()
+ * d ) Statistics
+ * </pre>
+ */
+@Category(ManagementTest.class)
+@SuppressWarnings("serial")
+public class DistributedSystemMXBeanDistributedTest implements Serializable {
+
+ private static final String MANAGER_NAME = "managerVM";
+ private static final String MEMBER_NAME = "memberVM-";
+
+ private static InternalCache cache;
+ private static InternalDistributedMember distributedMember;
+ private static SystemManagementService managementService;
+ private static DistributedSystemMXBean distributedSystemMXBean;
+
+ private VM managerVM;
+ private VM memberVM1;
+ private VM memberVM2;
+ private VM memberVM3;
+
+ @Rule
+ public DistributedRule distributedRule = new DistributedRule();
+
+ @Rule
+ public SharedErrorCollector errorCollector = new SharedErrorCollector();
+
+ @Before
+ public void setUp() throws Exception {
+ managerVM = getVM(0);
+ memberVM1 = getVM(1);
+ memberVM2 = getVM(2);
+ memberVM3 = getVM(3);
+
+ managerVM.invoke(() -> createManager());
+
+ for (VM memberVM : toArray(memberVM1, memberVM2, memberVM3)) {
+ memberVM.invoke(() -> createMember(memberVM.getId()));
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ for (VM vm : toArray(managerVM, memberVM1, memberVM2, memberVM3)) {
+ vm.invoke(() -> {
+ if (cache != null) {
+ cache.close();
+ }
+ cache = null;
+ distributedMember = null;
+ managementService = null;
+ distributedSystemMXBean = null;
+ });
+ }
+ }
+
+ @Test
+ public void getMemberCount() {
+ // 1 manager, 3 members, 1 dunit locator
+ managerVM.invoke(() -> {
+ await()
+ .untilAsserted(() -> assertThat(distributedSystemMXBean.getMemberCount()).isEqualTo(5));
+ });
+ }
+
+ @Test
+ public void showJVMMetrics() {
+ managerVM.invoke(() -> {
+ await().until(() -> distributedSystemMXBean.getMemberCount() == 5);
+
+ for (DistributedMember member : getOtherNormalMembers()) {
+ assertThat(distributedSystemMXBean.showJVMMetrics(member.getName())).isNotNull();
+ }
+ });
+ }
+
+ @Test
+ public void showOSMetrics() {
+ managerVM.invoke(() -> {
+ await().until(() -> distributedSystemMXBean.getMemberCount() == 5);
+
+ Set<InternalDistributedMember> otherMembers = getOtherNormalMembers();
+ for (DistributedMember member : otherMembers) {
+ assertThat(distributedSystemMXBean.showOSMetrics(member.getName())).isNotNull();
+ }
+ });
+ }
+
+ @Test
+ public void shutDownAllMembers() {
+ managerVM.invoke(() -> {
+ distributedSystemMXBean.shutDownAllMembers();
+
+ await().untilAsserted(() -> assertThat(getOtherNormalMembers()).hasSize(0));
+ });
+ }
+
+ @Test
+ public void listMemberObjectNames() {
+ managerVM.invoke(() -> {
+ await().untilAsserted(
+ () -> assertThat(distributedSystemMXBean.listMemberObjectNames()).hasSize(4));
+ });
+ }
+
+ @Test
+ public void fetchMemberObjectName() {
+ managerVM.invoke(() -> {
+ String memberName = distributedMember.getName();
+
+ await().until(() -> distributedSystemMXBean.fetchMemberObjectName(memberName) != null);
+
+ ObjectName memberMXBeanName = distributedSystemMXBean.fetchMemberObjectName(memberName);
+ assertThat(memberMXBeanName).isEqualTo(getMemberMBeanName(memberName));
+ });
+ }
+
+ private void createManager() {
+ Properties config = getDistributedSystemProperties();
+ config.setProperty(NAME, MANAGER_NAME);
+ config.setProperty(JMX_MANAGER, "true");
+ config.setProperty(JMX_MANAGER_START, "true");
+ config.setProperty(JMX_MANAGER_PORT, "0");
+ config.setProperty(HTTP_SERVICE_PORT, "0");
+
+ cache = (InternalCache) new CacheFactory(config).create();
+ distributedMember = cache.getDistributionManager().getId();
+ managementService = (SystemManagementService) ManagementService.getManagementService(cache);
+
+ distributedSystemMXBean = managementService.getDistributedSystemMXBean();
+ }
+
+ private void createMember(int vmId) {
+ Properties config = getDistributedSystemProperties();
+ config.setProperty(NAME, MEMBER_NAME + vmId);
+ config.setProperty(JMX_MANAGER, "false");
+
+ cache = (InternalCache) new CacheFactory(config).create();
+ distributedMember = cache.getDistributionManager().getId();
+ managementService = (SystemManagementService) ManagementService.getManagementService(cache);
+ }
+
+ private Set<InternalDistributedMember> getOtherNormalMembers() {
+ return cache.getDistributionManager().getOtherNormalDistributionManagerIds();
+ }
+}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/DistributedSystemMXBeanWithAlertsDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/DistributedSystemMXBeanWithAlertsDistributedTest.java
new file mode 100644
index 0000000..67478b6
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/management/DistributedSystemMXBeanWithAlertsDistributedTest.java
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management;
+
+import static java.lang.management.ManagementFactory.getPlatformMBeanServer;
+import static org.apache.geode.distributed.ConfigurationProperties.HTTP_SERVICE_PORT;
+import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER;
+import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_PORT;
+import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_START;
+import static org.apache.geode.distributed.ConfigurationProperties.NAME;
+import static org.apache.geode.internal.alerting.AlertLevel.ERROR;
+import static org.apache.geode.internal.alerting.AlertLevel.NONE;
+import static org.apache.geode.internal.alerting.AlertLevel.SEVERE;
+import static org.apache.geode.internal.alerting.AlertLevel.WARNING;
+import static org.apache.geode.management.JMXNotificationType.SYSTEM_ALERT;
+import static org.apache.geode.management.JMXNotificationUserData.ALERT_LEVEL;
+import static org.apache.geode.management.ManagementService.getManagementService;
+import static org.apache.geode.management.internal.MBeanJMXAdapter.getDistributedSystemName;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
+import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.apache.geode.test.dunit.VM.toArray;
+import static org.apache.geode.test.dunit.standalone.DUnitLauncher.getDistributedSystemProperties;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+import javax.management.InstanceNotFoundException;
+import javax.management.Notification;
+import javax.management.NotificationFilter;
+import javax.management.NotificationListener;
+
+import org.apache.logging.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.ArgumentCaptor;
+
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.internal.alerting.AlertLevel;
+import org.apache.geode.internal.alerting.AlertingService;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.dunit.rules.SharedErrorCollector;
+import org.apache.geode.test.junit.categories.AlertingTest;
+import org.apache.geode.test.junit.categories.ManagementTest;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+/**
+ * Distributed tests for {@link DistributedSystemMXBean} with alerts. Extracted from
+ * {@link DistributedSystemMXBeanDistributedTest}.
+ */
+@Category({ManagementTest.class, AlertingTest.class})
+@SuppressWarnings("serial")
+public class DistributedSystemMXBeanWithAlertsDistributedTest implements Serializable {
+
+ private static final Logger logger = LogService.getLogger();
+
+ private static final String MANAGER_NAME = "managerVM";
+ private static final String MEMBER_NAME = "memberVM-";
+ private static final long TIMEOUT = getTimeout().getValueInMS();
+ private static final NotificationFilter SYSTEM_ALERT_FILTER =
+ notification -> notification.getType().equals(SYSTEM_ALERT);
+
+ private static InternalCache cache;
+ private static AlertingService alertingService;
+ private static NotificationListener notificationListener;
+ private static DistributedSystemMXBean distributedSystemMXBean;
+
+ private DistributedMember managerMember;
+
+ private String warningLevelMessage;
+ private String errorLevelMessage;
+ private String severeLevelMessage;
+
+ private Alert warningAlert;
+ private Alert errorAlert;
+ private Alert severeAlert;
+
+ private VM managerVM;
+ private VM memberVM1;
+ private VM memberVM2;
+ private VM memberVM3;
+
+ @Rule
+ public DistributedRule distributedRule = new DistributedRule();
+
+ @Rule
+ public SerializableTestName testName = new SerializableTestName();
+
+ @Rule
+ public SharedErrorCollector errorCollector = new SharedErrorCollector();
+
+ @Before
+ public void setUp() throws Exception {
+ warningLevelMessage = WARNING.name() + " level alert in " + testName.getMethodName();
+ errorLevelMessage = ERROR.name() + " level alert in " + testName.getMethodName();
+ severeLevelMessage = SEVERE.name() + " level alert in " + testName.getMethodName();
+
+ warningAlert = new Alert(WARNING, warningLevelMessage);
+ errorAlert = new Alert(ERROR, errorLevelMessage);
+ severeAlert = new Alert(SEVERE, severeLevelMessage);
+
+ managerVM = getVM(0);
+ memberVM1 = getVM(1);
+ memberVM2 = getVM(2);
+ memberVM3 = getVM(3);
+
+ managerMember = managerVM.invoke(() -> createManager());
+
+ for (VM memberVM : toArray(memberVM1, memberVM2, memberVM3)) {
+ memberVM.invoke(() -> {
+ createMember(memberVM);
+ });
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ for (VM vm : toArray(managerVM, memberVM1, memberVM2, memberVM3)) {
+ vm.invoke(() -> {
+ if (cache != null) {
+ cache.close();
+ }
+ cache = null;
+ alertingService = null;
+ notificationListener = null;
+ distributedSystemMXBean = null;
+ });
+ }
+ }
+
+ @Test
+ public void managerAddsAlertListenerToEachMember() {
+ // default AlertLevel is SEVERE
+
+ for (VM memberVM : toArray(memberVM1, memberVM2, memberVM3)) {
+ memberVM.invoke(() -> {
+ await().untilAsserted(
+ () -> assertThat(alertingService.hasAlertListener(managerMember, SEVERE)).isTrue());
+ });
+ }
+ }
+
+ @Test
+ public void managerReceivesRemoteAlertAtAlertLevel() {
+ memberVM1.invoke(() -> {
+ try (IgnoredException ie = addIgnoredException(severeLevelMessage)) {
+ logger.fatal(severeLevelMessage);
+ }
+ });
+
+ managerVM.invoke(() -> {
+ assertThat(captureAlert()).isEqualTo(severeAlert);
+ });
+ }
+
+ @Test
+ public void managerDoesNotReceiveRemoteAlertBelowAlertLevel() {
+ memberVM1.invoke(() -> {
+ try (IgnoredException ie = addIgnoredException(warningLevelMessage)) {
+ logger.warn(warningLevelMessage);
+ }
+ try (IgnoredException ie = addIgnoredException(errorLevelMessage)) {
+ logger.error(errorLevelMessage);
+ }
+ });
+
+ managerVM.invoke(() -> {
+ verifyZeroInteractions(notificationListener);
+ });
+ }
+
+ @Test
+ public void managerReceivesRemoteAlertAboveAlertLevel() {
+ changeAlertLevel(WARNING);
+
+ memberVM1.invoke(() -> {
+ try (IgnoredException ie = addIgnoredException(errorLevelMessage)) {
+ logger.error(errorLevelMessage);
+ }
+ try (IgnoredException ie = addIgnoredException(severeLevelMessage)) {
+ logger.fatal(severeLevelMessage);
+ }
+ });
+
+ managerVM.invoke(() -> {
+ assertThat(captureAllAlerts(2)).contains(errorAlert, severeAlert);
+ });
+ }
+
+ @Test
+ public void managerReceivesLocalAlertAtAlertLevel() {
+ managerVM.invoke(() -> {
+ try (IgnoredException ie = addIgnoredException(severeLevelMessage)) {
+ logger.fatal(severeLevelMessage);
+ }
+ });
+
+ managerVM.invoke(() -> {
+ assertThat(captureAlert()).isEqualTo(severeAlert);
+ });
+ }
+
+ @Test
+ public void managerDoesNotReceiveLocalAlertBelowAlertLevel() {
+ managerVM.invoke(() -> {
+ try (IgnoredException ie = addIgnoredException(warningLevelMessage)) {
+ logger.warn(warningLevelMessage);
+ }
+ try (IgnoredException ie = addIgnoredException(errorLevelMessage)) {
+ logger.error(errorLevelMessage);
+ }
+ });
+
+ managerVM.invoke(() -> {
+ verifyZeroInteractions(notificationListener);
+ });
+ }
+
+ /**
+ * Fails due to GEODE-5923: JMX manager only receives local Alerts at the default AlertLevel
+ *
+ * <p>
+ * The JMX manager's local AlertListener for itself remains stuck at {@code SEVERE} even after
+ * invoking {@link DistributedSystemMXBean#changeAlertLevel(String)}.
+ */
+ @Test
+ @Ignore("GEODE-5923")
+ public void managerReceivesLocalAlertAboveAlertLevel() {
+ changeAlertLevel(WARNING);
+
+ managerVM.invoke(() -> {
+ try (IgnoredException ie = addIgnoredException(errorLevelMessage)) {
+ logger.error(errorLevelMessage);
+ }
+ try (IgnoredException ie = addIgnoredException(severeLevelMessage)) {
+ logger.fatal(severeLevelMessage);
+ }
+ });
+
+ managerVM.invoke(() -> {
+ assertThat(captureAllAlerts(2)).contains(errorAlert, severeAlert);
+ });
+ }
+
+ @Test
+ public void managerReceivesAlertsFromAllMembers() {
+ for (VM memberVM : toArray(memberVM1, memberVM2, memberVM3)) {
+ memberVM.invoke(() -> {
+ try (IgnoredException ie = addIgnoredException(severeLevelMessage)) {
+ logger.fatal(severeLevelMessage);
+ }
+ });
+ }
+
+ managerVM.invoke(() -> {
+ assertThat(captureAllAlerts(3)).contains(severeAlert, severeAlert, severeAlert);
+ });
+ }
+
+ @Test
+ public void managerReceivesAlertsFromAllMembersAtAlertLevelAndAbove() {
+ changeAlertLevel(WARNING);
+
+ for (VM memberVM : toArray(memberVM1, memberVM2, memberVM3)) {
+ memberVM.invoke(() -> {
+ try (IgnoredException ie = addIgnoredException(warningLevelMessage)) {
+ logger.warn(warningLevelMessage);
+ }
+ try (IgnoredException ie = addIgnoredException(errorLevelMessage)) {
+ logger.error(errorLevelMessage);
+ }
+ try (IgnoredException ie = addIgnoredException(severeLevelMessage)) {
+ logger.fatal(severeLevelMessage);
+ }
+ });
+ }
+
+ managerVM.invoke(() -> {
+ assertThat(captureAllAlerts(9)).contains(warningAlert, warningAlert, warningAlert, errorAlert,
+ errorAlert, errorAlert, severeAlert, severeAlert, severeAlert);
+ });
+ }
+
+ @Test
+ public void managerDoesNotReceiveAlertsAtAlertLevelNone() {
+ changeAlertLevel(NONE);
+
+ for (VM memberVM : toArray(memberVM1, memberVM2, memberVM3)) {
+ memberVM.invoke(() -> {
+ try (IgnoredException ie = addIgnoredException(warningLevelMessage)) {
+ logger.warn(warningLevelMessage);
+ }
+ try (IgnoredException ie = addIgnoredException(errorLevelMessage)) {
+ logger.error(errorLevelMessage);
+ }
+ try (IgnoredException ie = addIgnoredException(severeLevelMessage)) {
+ logger.fatal(severeLevelMessage);
+ }
+ });
+ }
+
+ managerVM.invoke(() -> {
+ verifyZeroInteractions(notificationListener);
+ });
+ }
+
+ @Test
+ public void managerMissesAnyAlertsBeforeItStarts() {
+ // close managerVM so we can recreate it AFTER generating alerts in memberVMs
+ managerVM.invoke(() -> cache.close());
+
+ for (VM memberVM : toArray(memberVM1, memberVM2, memberVM3)) {
+ memberVM.invoke(() -> {
+ try (IgnoredException ie = addIgnoredException(severeLevelMessage)) {
+ logger.fatal(severeLevelMessage);
+ }
+ });
+ }
+
+ managerVM.invoke(() -> createManager());
+
+ managerMember = managerVM.invoke(() -> cache.getDistributionManager().getId());
+
+ for (VM memberVM : toArray(memberVM1, memberVM2, memberVM3)) {
+ memberVM.invoke(() -> {
+ await().until(() -> alertingService.hasAlertListener(managerMember, SEVERE));
+ });
+ }
+
+ // managerVM should have missed the alerts from BEFORE it started
+
+ managerVM.invoke(() -> {
+ verifyZeroInteractions(notificationListener);
+ });
+
+ // managerVM should now receive any new alerts though
+
+ for (VM memberVM : toArray(memberVM1, memberVM2, memberVM3)) {
+ memberVM.invoke(() -> {
+ try (IgnoredException ie = addIgnoredException(severeLevelMessage)) {
+ logger.fatal(severeLevelMessage);
+ }
+ });
+ }
+
+ managerVM.invoke(() -> {
+ assertThat(captureAllAlerts(3)).contains(severeAlert, severeAlert, severeAlert);
+ });
+ }
+
+ private DistributedMember createManager() throws InstanceNotFoundException {
+ Properties config = getDistributedSystemProperties();
+ config.setProperty(NAME, MANAGER_NAME);
+ config.setProperty(JMX_MANAGER, "true");
+ config.setProperty(JMX_MANAGER_START, "true");
+ config.setProperty(JMX_MANAGER_PORT, "0");
+ config.setProperty(HTTP_SERVICE_PORT, "0");
+
+ cache = (InternalCache) new CacheFactory(config).create();
+ alertingService = cache.getInternalDistributedSystem().getAlertingService();
+
+ notificationListener = spy(NotificationListener.class);
+ getPlatformMBeanServer().addNotificationListener(getDistributedSystemName(),
+ notificationListener, SYSTEM_ALERT_FILTER, null);
+
+ distributedSystemMXBean = getManagementService(cache).getDistributedSystemMXBean();
+
+ return cache.getDistributionManager().getId();
+ }
+
+ private void createMember(VM memberVM) {
+ Properties config = getDistributedSystemProperties();
+ config.setProperty(NAME, MEMBER_NAME + memberVM.getId());
+ config.setProperty(JMX_MANAGER, "false");
+
+ cache = (InternalCache) new CacheFactory(config).create();
+ alertingService = cache.getInternalDistributedSystem().getAlertingService();
+
+ await().until(() -> alertingService.hasAlertListener(managerMember, SEVERE));
+ }
+
+ private void changeAlertLevel(AlertLevel alertLevel) {
+ managerVM.invoke(() -> {
+ distributedSystemMXBean.changeAlertLevel(alertLevel.name());
+ });
+
+ for (VM memberVM : toArray(memberVM1, memberVM2, memberVM3)) {
+ memberVM.invoke(() -> {
+ if (alertLevel == NONE) {
+ await().until(() -> !alertingService.hasAlertListener(managerMember, alertLevel));
+ } else {
+ await().until(() -> alertingService.hasAlertListener(managerMember, alertLevel));
+ }
+ });
+ }
+ }
+
+ private Notification captureNotification() {
+ ArgumentCaptor<Notification> captor = ArgumentCaptor.forClass(Notification.class);
+ verify(notificationListener, timeout(TIMEOUT)).handleNotification(captor.capture(), isNull());
+ return captor.getValue();
+ }
+
+ private List<Notification> captureAllNotifications(int count) {
+ ArgumentCaptor<Notification> captor = ArgumentCaptor.forClass(Notification.class);
+ verify(notificationListener, timeout(TIMEOUT).times(count)).handleNotification(captor.capture(),
+ isNull());
+ return captor.getAllValues();
+ }
+
+ private Alert captureAlert() {
+ Notification notification = captureNotification();
+ return new Alert(getAlertLevel(notification), notification.getMessage());
+ }
+
+ private List<Alert> captureAllAlerts(int count) {
+ List<Alert> alerts = new ArrayList<>();
+ for (Notification notification : captureAllNotifications(count)) {
+ alerts.add(new Alert(getAlertLevel(notification), notification.getMessage()));
+ }
+ return alerts;
+ }
+
+ private static AlertLevel getAlertLevel(Notification notification) {
+ return AlertLevel.valueOf(getUserData(notification).get(ALERT_LEVEL).toUpperCase());
+ }
+
+ private static Map<String, String> getUserData(Notification notification) {
+ return (Map<String, String>) notification.getUserData();
+ }
+
+ /**
+ * Simple struct with {@link AlertLevel} and {@code String} message with {@code equals}
+ * implemented to compare both fields.
+ */
+ private static class Alert implements Serializable {
+
+ private final AlertLevel level;
+ private final String message;
+
+ Alert(AlertLevel level, String message) {
+ this.level = level;
+ this.message = message;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ Alert alert = (Alert) obj;
+ return level == alert.level && Objects.equals(message, alert.message);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(level, message);
+ }
+ }
+}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/DistributedSystemMXBeanWithNotificationsDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/DistributedSystemMXBeanWithNotificationsDistributedTest.java
new file mode 100644
index 0000000..b7a581a
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/management/DistributedSystemMXBeanWithNotificationsDistributedTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management;
+
+import static java.lang.management.ManagementFactory.getPlatformMBeanServer;
+import static org.apache.geode.distributed.ConfigurationProperties.HTTP_SERVICE_PORT;
+import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER;
+import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_PORT;
+import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_START;
+import static org.apache.geode.distributed.ConfigurationProperties.NAME;
+import static org.apache.geode.management.JMXNotificationType.REGION_CREATED;
+import static org.apache.geode.management.internal.MBeanJMXAdapter.getMemberMBeanName;
+import static org.apache.geode.management.internal.MBeanJMXAdapter.getMemberNameOrId;
+import static org.apache.geode.management.internal.ManagementConstants.REGION_CREATED_PREFIX;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.apache.geode.test.dunit.VM.toArray;
+import static org.apache.geode.test.dunit.standalone.DUnitLauncher.getDistributedSystemProperties;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Properties;
+
+import javax.management.Notification;
+import javax.management.NotificationBroadcasterSupport;
+import javax.management.NotificationListener;
+import javax.management.ObjectName;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.management.internal.NotificationHub;
+import org.apache.geode.management.internal.NotificationHub.NotificationHubListener;
+import org.apache.geode.management.internal.SystemManagementService;
+import org.apache.geode.management.internal.beans.MemberMBean;
+import org.apache.geode.management.internal.beans.SequenceNumber;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.dunit.rules.SharedErrorCollector;
+import org.apache.geode.test.junit.categories.ManagementTest;
+
+/**
+ * Distributed tests for {@link DistributedSystemMXBean} notifications. Extracted from
+ * {@link DistributedSystemMXBeanDistributedTest}.
+ *
+ * <p>
+ * TODO: test all notifications emitted by DistributedSystemMXBean:
+ *
+ * <pre>
+ * a) gemfire.distributedsystem.member.joined
+ * b) gemfire.distributedsystem.member.left
+ * c) gemfire.distributedsystem.member.suspect
+ * d) All notifications emitted by MemberMXBeans
+ * </pre>
+ */
+@Category(ManagementTest.class)
+@SuppressWarnings("serial")
+public class DistributedSystemMXBeanWithNotificationsDistributedTest implements Serializable {
+
+ private static final long TIMEOUT = getTimeout().getValueInMS();
+ private static final String MANAGER_NAME = "managerVM";
+ private static final String MEMBER_NAME = "memberVM-";
+
+ /** One NotificationListener is added for the DistributedSystemMXBean in Manager VM. */
+ private static final int ONE_LISTENER_FOR_MANAGER = 1;
+
+ /** One NotificationListener is added for spying by the test. */
+ private static final int ONE_LISTENER_FOR_SPYING = 1;
+
+ /** Three Member VMs, one Manager VM and one DUnit Locator VM. */
+ private static final int CLUSTER_SIZE = 5;
+
+ /** Three Member VMs. */
+ private static final int THREE_MEMBERS = 3;
+
+ private static InternalCache cache;
+ private static InternalDistributedMember distributedMember;
+ private static SystemManagementService managementService;
+ private static NotificationListener notificationListener;
+ private static DistributedSystemMXBean distributedSystemMXBean;
+
+ private VM managerVM;
+ private VM memberVM1;
+ private VM memberVM2;
+ private VM memberVM3;
+
+ @Rule
+ public DistributedRule distributedRule = new DistributedRule();
+
+ @Rule
+ public SharedErrorCollector errorCollector = new SharedErrorCollector();
+
+ @Before
+ public void setUp() throws Exception {
+ managerVM = getVM(0);
+ memberVM1 = getVM(1);
+ memberVM2 = getVM(2);
+ memberVM3 = getVM(3);
+
+ managerVM.invoke(() -> createManager());
+
+ for (VM memberVM : toArray(memberVM1, memberVM2, memberVM3)) {
+ memberVM.invoke(() -> createMember(memberVM.getId()));
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ for (VM vm : toArray(managerVM, memberVM1, memberVM2, memberVM3)) {
+ vm.invoke(() -> {
+ if (cache != null) {
+ cache.close();
+ }
+ cache = null;
+ distributedMember = null;
+ managementService = null;
+ distributedSystemMXBean = null;
+ });
+ }
+ }
+
+ @Test
+ public void testNotificationHub() {
+ // add spy notificationListener to each MemberMXBean in cluster
+ managerVM.invoke(() -> {
+ // wait until Manager VM has MemberMXBean for each node in cluster
+ await().untilAsserted(
+ () -> assertThat(distributedSystemMXBean.listMemberObjectNames()).hasSize(CLUSTER_SIZE));
+
+ for (ObjectName objectName : distributedSystemMXBean.listMemberObjectNames()) {
+ getPlatformMBeanServer().addNotificationListener(objectName, notificationListener, null,
+ null);
+ }
+ });
+
+ // verify each Member VM has one spy listener in addition to one for DistributedSystemMXBean
+ for (VM memberVM : toArray(memberVM1, memberVM2, memberVM3)) {
+ memberVM.invoke(() -> {
+ Map<ObjectName, NotificationHubListener> listenerObjectMap =
+ managementService.getNotificationHub().getListenerObjectMap();
+ NotificationHubListener hubListener =
+ listenerObjectMap.get(getMemberMBeanName(distributedMember));
+
+ assertThat(hubListener.getNumCounter())
+ .isEqualTo(ONE_LISTENER_FOR_SPYING + ONE_LISTENER_FOR_MANAGER);
+ });
+ }
+
+ // send a dummy notification from each Member VM (no actual region is created)
+ for (VM memberVM : toArray(memberVM1, memberVM2, memberVM3)) {
+ memberVM.invoke(() -> {
+ Notification notification =
+ new Notification(REGION_CREATED, getMemberNameOrId(distributedMember),
+ SequenceNumber.next(), System.currentTimeMillis(), REGION_CREATED_PREFIX + "/test");
+ NotificationBroadcasterSupport notifier = (MemberMBean) managementService.getMemberMXBean();
+ notifier.sendNotification(notification);
+ });
+ }
+
+ // remove spy notificationListener from each MemberMXBean in cluster
+ managerVM.invoke(() -> {
+ verify(notificationListener, timeout(TIMEOUT).times(THREE_MEMBERS))
+ .handleNotification(isA(Notification.class), isNull());
+
+ for (ObjectName objectName : distributedSystemMXBean.listMemberObjectNames()) {
+ getPlatformMBeanServer().removeNotificationListener(objectName, notificationListener);
+ }
+ });
+
+ // verify each Member VM has just one listener for DistributedSystemMXBean
+ for (VM memberVM : toArray(memberVM1, memberVM2, memberVM3)) {
+ memberVM.invoke(() -> {
+ Map<ObjectName, NotificationHubListener> listenerObjectMap =
+ managementService.getNotificationHub().getListenerObjectMap();
+ NotificationHubListener hubListener =
+ listenerObjectMap.get(getMemberMBeanName(distributedMember));
+
+ assertThat(hubListener.getNumCounter()).isEqualTo(ONE_LISTENER_FOR_MANAGER);
+ });
+ }
+
+ // verify NotificationHub#cleanUpListeners() behavior in each Member VM
+ for (VM memberVM : toArray(memberVM1, memberVM2, memberVM3)) {
+ memberVM.invoke(() -> {
+ NotificationHub notificationHub = managementService.getNotificationHub();
+ notificationHub.cleanUpListeners();
+
+ assertThat(notificationHub.getListenerObjectMap()).isEmpty();
+ });
+ }
+ }
+
+ private void createManager() {
+ Properties config = getDistributedSystemProperties();
+ config.setProperty(NAME, MANAGER_NAME);
+ config.setProperty(JMX_MANAGER, "true");
+ config.setProperty(JMX_MANAGER_START, "true");
+ config.setProperty(JMX_MANAGER_PORT, "0");
+ config.setProperty(HTTP_SERVICE_PORT, "0");
+
+ cache = (InternalCache) new CacheFactory(config).create();
+ distributedMember = cache.getDistributionManager().getId();
+ managementService = (SystemManagementService) ManagementService.getManagementService(cache);
+ notificationListener = spy(NotificationListener.class);
+
+ distributedSystemMXBean = managementService.getDistributedSystemMXBean();
+ }
+
+ private void createMember(int vmId) {
+ Properties config = getDistributedSystemProperties();
+ config.setProperty(NAME, MEMBER_NAME + vmId);
+ config.setProperty(JMX_MANAGER, "false");
+
+ cache = (InternalCache) new CacheFactory(config).create();
+ distributedMember = cache.getDistributionManager().getId();
+ managementService = (SystemManagementService) ManagementService.getManagementService(cache);
+ }
+}
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/management/DistributedSystemMXBeanIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/management/DistributedSystemMXBeanIntegrationTest.java
index fefc923..ed11dd1 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/management/DistributedSystemMXBeanIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/management/DistributedSystemMXBeanIntegrationTest.java
@@ -17,25 +17,30 @@
package org.apache.geode.management;
import static java.lang.management.ManagementFactory.getPlatformMBeanServer;
-import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_TIME_STATISTICS;
+import static javax.management.JMX.newMXBeanProxy;
import static org.apache.geode.distributed.ConfigurationProperties.HTTP_SERVICE_PORT;
import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER;
import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_PORT;
import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_START;
-import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.NAME;
-import static org.apache.geode.distributed.ConfigurationProperties.STATISTIC_SAMPLING_ENABLED;
+import static org.apache.geode.distributed.ConfigurationProperties.START_LOCATOR;
+import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort;
+import static org.apache.geode.internal.alerting.AlertLevel.NONE;
+import static org.apache.geode.internal.alerting.AlertLevel.WARNING;
+import static org.apache.geode.management.JMXNotificationType.SYSTEM_ALERT;
import static org.apache.geode.management.internal.MBeanJMXAdapter.getDistributedSystemName;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
+import static org.apache.geode.test.dunit.NetworkUtils.getServerHostName;
import static org.apache.geode.test.dunit.standalone.DUnitLauncher.getDistributedSystemProperties;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import java.util.Properties;
-import javax.management.JMX;
import javax.management.Notification;
import javax.management.NotificationFilter;
import javax.management.NotificationListener;
@@ -48,23 +53,39 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.mockito.ArgumentCaptor;
import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystem;
+import org.apache.geode.internal.alerting.AlertLevel;
+import org.apache.geode.internal.alerting.AlertingService;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.test.junit.categories.AlertingTest;
import org.apache.geode.test.junit.categories.ManagementTest;
/**
* Integration tests for {@link DistributedSystemMXBean} with local {@link DistributedSystem}
* connection.
+ *
+ * <p>
+ * TODO:GEODE-5923: write more tests after fixing GEODE-5923
*/
-@Category(ManagementTest.class)
+@Category({ManagementTest.class, AlertingTest.class})
public class DistributedSystemMXBeanIntegrationTest {
+ private static final Logger logger = LogService.getLogger();
+
+ private static final long TIMEOUT = getTimeout().getValueInMS();
+ private static final NotificationFilter SYSTEM_ALERT_FILTER =
+ notification -> notification.getType().equals(SYSTEM_ALERT);
+
private String name;
private InternalCache cache;
- private Logger logger;
+ private DistributedMember distributedMember;
+ private AlertingService alertingService;
+ private NotificationListener notificationListener;
private String alertMessage;
private DistributedSystemMXBean distributedSystemMXBean;
@@ -74,25 +95,28 @@ public class DistributedSystemMXBeanIntegrationTest {
@Before
public void setUp() throws Exception {
+ alertMessage = "Alerting in " + testName.getMethodName();
name = "Manager in " + testName.getMethodName();
+ String startLocator = getServerHostName() + "[" + getRandomAvailableTCPPort() + "]";
+
Properties config = getDistributedSystemProperties();
config.setProperty(NAME, name);
-
- config.setProperty(LOCATORS, "");
+ config.setProperty(START_LOCATOR, startLocator);
config.setProperty(JMX_MANAGER, "true");
config.setProperty(JMX_MANAGER_START, "true");
config.setProperty(JMX_MANAGER_PORT, "0");
config.setProperty(HTTP_SERVICE_PORT, "0");
- config.setProperty(ENABLE_TIME_STATISTICS, "true");
- config.setProperty(STATISTIC_SAMPLING_ENABLED, "true");
cache = (InternalCache) new CacheFactory(config).create();
- logger = LogService.getLogger();
+ distributedMember = cache.getDistributionManager().getId();
+ alertingService = cache.getInternalDistributedSystem().getAlertingService();
- alertMessage = "Alerting in " + testName.getMethodName();
+ notificationListener = spy(NotificationListener.class);
+ getPlatformMBeanServer().addNotificationListener(getDistributedSystemName(),
+ notificationListener, SYSTEM_ALERT_FILTER, null);
- distributedSystemMXBean = JMX.newMXBeanProxy(getPlatformMBeanServer(),
+ distributedSystemMXBean = newMXBeanProxy(getPlatformMBeanServer(),
getDistributedSystemName(), DistributedSystemMXBean.class);
}
@@ -112,23 +136,40 @@ public class DistributedSystemMXBeanIntegrationTest {
assertThat(distributedSystemMXBean.listMembers()).containsExactly(name);
}
+ @Test
+ public void receivesLocalSystemAlertNotificationAtDefaultAlertLevel() {
+ logger.fatal(alertMessage);
+
+ assertThat(captureNotification().getMessage()).isEqualTo(alertMessage);
+ }
+
/**
- * This test confirms existence of bug GEODE-5923.
+ * Fails due to GEODE-5923: JMX manager receives local Alerts only for the default AlertLevel
*/
@Test
- @Ignore("GEODE-5923")
- public void providesSystemAlertNotification() throws Exception {
- NotificationListener notificationListener = spy(NotificationListener.class);
- NotificationFilter notificationFilter = (Notification notification) -> notification.getType()
- .equals(JMXNotificationType.SYSTEM_ALERT);
- getPlatformMBeanServer().addNotificationListener(getDistributedSystemName(),
- notificationListener, notificationFilter, null);
+ @Ignore("TODO:GEODE-5923: re-enable test after fixing GEODE-5923")
+ public void receivesLocalSystemAlertNotificationAtNewAlertLevel() throws Exception {
+ changeAlertLevel(WARNING);
- // work around GEODE-5924 by invoking DistributedSystemMXBean.changeAlertLevel
- distributedSystemMXBean.changeAlertLevel("warning");
+ logger.warn(alertMessage);
- logger.fatal(alertMessage);
+ assertThat(captureNotification().getMessage()).isEqualTo(alertMessage);
+ }
+
+ private Notification captureNotification() {
+ ArgumentCaptor<Notification> notificationCaptor = ArgumentCaptor.forClass(Notification.class);
+ verify(notificationListener, timeout(TIMEOUT))
+ .handleNotification(notificationCaptor.capture(), isNull());
+ return notificationCaptor.getValue();
+ }
+
+ private void changeAlertLevel(AlertLevel alertLevel) throws Exception {
+ distributedSystemMXBean.changeAlertLevel(alertLevel.name());
- verify(notificationListener).handleNotification(isA(Notification.class), isNull());
+ if (alertLevel == NONE) {
+ await().until(() -> !alertingService.hasAlertListener(distributedMember, alertLevel));
+ } else {
+ await().until(() -> alertingService.hasAlertListener(distributedMember, alertLevel));
+ }
}
}