You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2017/03/09 20:48:47 UTC
[08/50] [abbrv] geode git commit: GEODE-2568: The JMX MBean is now
removed when its AsyncEventQueue is removed
GEODE-2568: The JMX MBean is now removed when its AsyncEventQueue is removed
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/e5121aba
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/e5121aba
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/e5121aba
Branch: refs/heads/feature/GEODE-1969
Commit: e5121abad3b4c11620b0be037ada4484e7248875
Parents: 4e61799
Author: Barry Oglesby <bo...@pivotal.io>
Authored: Wed Mar 1 13:20:31 2017 -0800
Committer: Barry Oglesby <bo...@pivotal.io>
Committed: Fri Mar 3 15:08:53 2017 -0800
----------------------------------------------------------------------
.../distributed/internal/ResourceEvent.java | 1 +
.../geode/internal/cache/GemFireCacheImpl.java | 1 +
.../geode/management/JMXNotificationType.java | 8 ++
.../internal/ManagementConstants.java | 2 +
.../internal/beans/AsyncEventQueueMBean.java | 3 +
.../beans/AsyncEventQueueMBeanBridge.java | 3 +
.../internal/beans/ManagementAdapter.java | 36 ++++++++
.../internal/beans/ManagementListener.java | 4 +
.../geode/internal/cache/wan/WANTestBase.java | 6 ++
.../management/WANManagementDUnitTest.java | 92 ++++++++++++++------
10 files changed, 131 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/e5121aba/geode-core/src/main/java/org/apache/geode/distributed/internal/ResourceEvent.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ResourceEvent.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ResourceEvent.java
index acfb157..b004113 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ResourceEvent.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ResourceEvent.java
@@ -34,6 +34,7 @@ public enum ResourceEvent {
MANAGER_STOP,
LOCATOR_START,
ASYNCEVENTQUEUE_CREATE,
+ ASYNCEVENTQUEUE_REMOVE,
SYSTEM_ALERT,
CACHE_SERVER_START,
CACHE_SERVER_STOP,
http://git-wip-us.apache.org/repos/asf/geode/blob/e5121aba/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index 66f1a4a..fcf7a2a 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -4131,6 +4131,7 @@ public class GemFireCacheImpl
this.allAsyncEventQueues.remove(asyncQueue);
this.allVisibleAsyncEventQueues.remove(asyncQueue);
}
+ system.handleResourceEvent(ResourceEvent.ASYNCEVENTQUEUE_REMOVE, asyncQueue);
}
/* Cache API - get the conflict resolver for WAN */
http://git-wip-us.apache.org/repos/asf/geode/blob/e5121aba/geode-core/src/main/java/org/apache/geode/management/JMXNotificationType.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/management/JMXNotificationType.java b/geode-core/src/main/java/org/apache/geode/management/JMXNotificationType.java
index 3fb8ba2..ba3daeb 100644
--- a/geode-core/src/main/java/org/apache/geode/management/JMXNotificationType.java
+++ b/geode-core/src/main/java/org/apache/geode/management/JMXNotificationType.java
@@ -167,6 +167,14 @@ public interface JMXNotificationType {
DistributionConfig.GEMFIRE_PREFIX + "distributedsystem.asycn.event.queue.created";
/**
+ * Notification type which indicates that an async queue has been closed. <BR>
+ * The value of this type string is
+ * <CODE>gemfire.distributedsystem.async.event.queue.closed</CODE>.
+ */
+ public static final String ASYNC_EVENT_QUEUE_CLOSED =
+ DistributionConfig.GEMFIRE_PREFIX + "distributedsystem.async.event.queue.closed";
+
+ /**
* Notification type which indicates a GemFire system generated alert <BR>
* The value of this type string is <CODE>system.alert</CODE>.
*/
http://git-wip-us.apache.org/repos/asf/geode/blob/e5121aba/geode-core/src/main/java/org/apache/geode/management/internal/ManagementConstants.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/ManagementConstants.java b/geode-core/src/main/java/org/apache/geode/management/internal/ManagementConstants.java
index d5bfedb..151aa9d 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/ManagementConstants.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/ManagementConstants.java
@@ -187,6 +187,8 @@ public interface ManagementConstants {
public static final String ASYNC_EVENT_QUEUE_CREATED_PREFIX =
"Async Event Queue is Created in the VM ";
+ public static final String ASYNC_EVENT_QUEUE_CLOSED_PREFIX =
+ "Async Event Queue is Closed in the VM ";
public static final String CACHE_SERVICE_CREATED_PREFIX = "Cache Service Created With Name ";
http://git-wip-us.apache.org/repos/asf/geode/blob/e5121aba/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBean.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBean.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBean.java
index 2f2757f..7cea8e9 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBean.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBean.java
@@ -107,4 +107,7 @@ public class AsyncEventQueueMBean extends NotificationBroadcasterSupport
return bridge.getEventQueueSize();
}
+ public void stopMonitor() {
+ bridge.stopMonitor();
+ }
}
http://git-wip-us.apache.org/repos/asf/geode/blob/e5121aba/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBeanBridge.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBeanBridge.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBeanBridge.java
index e57bc55..83cfb88 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBeanBridge.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBeanBridge.java
@@ -114,4 +114,7 @@ public class AsyncEventQueueMBeanBridge {
}
}
+ public void stopMonitor() {
+ monitor.stopListener();
+ }
}
http://git-wip-us.apache.org/repos/asf/geode/blob/e5121aba/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java
index f601a9a..183a5a8 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java
@@ -592,6 +592,42 @@ public class ManagementAdapter {
}
/**
+ * Handles AsyncEventQueue Removal
+ *
+ * @param queue The AsyncEventQueue being removed
+ */
+ public void handleAsyncEventQueueRemoval(AsyncEventQueue queue) throws ManagementException {
+ if (!isServiceInitialised("handleAsyncEventQueueRemoval")) {
+ return;
+ }
+
+ ObjectName asycnEventQueueMBeanName = MBeanJMXAdapter.getAsycnEventQueueMBeanName(
+ cacheImpl.getDistributedSystem().getDistributedMember(), queue.getId());
+ AsyncEventQueueMBean bean = null;
+ try {
+ bean = (AsyncEventQueueMBean) service.getLocalAsyncEventQueueMXBean(queue.getId());
+ if (bean == null) {
+ return;
+ }
+ } catch (ManagementException e) {
+ // If no bean found its a NO-OP
+ if (logger.isDebugEnabled()) {
+ logger.debug(e.getMessage(), e);
+ }
+ return;
+ }
+
+ bean.stopMonitor();
+
+ service.unregisterMBean(asycnEventQueueMBeanName);
+
+ Notification notification = new Notification(JMXNotificationType.ASYNC_EVENT_QUEUE_CLOSED,
+ memberSource, SequenceNumber.next(), System.currentTimeMillis(),
+ ManagementConstants.ASYNC_EVENT_QUEUE_CLOSED_PREFIX + queue.getId());
+ memberLevelNotifEmitter.sendNotification(notification);
+ }
+
+ /**
* Sends the alert with the Object source as member. This notification will get filtered out for
* particular alert level
*
http://git-wip-us.apache.org/repos/asf/geode/blob/e5121aba/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java
index 12392b5..d841122 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java
@@ -185,6 +185,10 @@ public class ManagementListener implements ResourceEventsListener {
AsyncEventQueue queue = (AsyncEventQueue) resource;
adapter.handleAsyncEventQueueCreation(queue);
break;
+ case ASYNCEVENTQUEUE_REMOVE:
+ AsyncEventQueue removedQueue = (AsyncEventQueue) resource;
+ adapter.handleAsyncEventQueueRemoval(removedQueue);
+ break;
case SYSTEM_ALERT:
AlertDetails details = (AlertDetails) resource;
adapter.handleSystemNotification(details);
http://git-wip-us.apache.org/repos/asf/geode/blob/e5121aba/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index f0704e8..c8de7dc 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -3470,6 +3470,12 @@ public class WANTestBase extends JUnit4DistributedTestCase {
}
}
+ public static void destroyAsyncEventQueue(String id) {
+ AsyncEventQueueImpl aeq = (AsyncEventQueueImpl) cache.getAsyncEventQueue(id);
+ assertNotNull(aeq);
+ aeq.destroy();
+ }
+
protected static void verifyListenerEvents(final long expectedNumEvents) {
Awaitility.await().atMost(60, TimeUnit.SECONDS)
.until(() -> listener1.getNumEvents() == expectedNumEvents);
http://git-wip-us.apache.org/repos/asf/geode/blob/e5121aba/geode-wan/src/test/java/org/apache/geode/management/WANManagementDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/management/WANManagementDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/management/WANManagementDUnitTest.java
index 03c0d62..880648d 100644
--- a/geode-wan/src/test/java/org/apache/geode/management/WANManagementDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/management/WANManagementDUnitTest.java
@@ -14,21 +14,19 @@
*/
package org.apache.geode.management;
+import org.apache.geode.management.internal.SystemManagementService;
import org.junit.experimental.categories.Category;
import org.junit.Test;
import static org.junit.Assert.*;
import org.awaitility.Awaitility;
-import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
-import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
import org.apache.geode.test.junit.categories.DistributedTest;
import org.apache.geode.test.junit.categories.FlakyTest;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.geode.cache.Cache;
@@ -36,7 +34,6 @@ import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.Locator;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.wan.WANTestBase;
-import org.apache.geode.management.internal.MBeanJMXAdapter;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
@@ -215,16 +212,50 @@ public class WANManagementDUnitTest extends ManagementTestBase {
1, 100, false));
nyReceiver.invoke(() -> WANTestBase.createReceiver());
- checkAsyncQueueMBean(puneSender);
- checkAsyncQueueMBean(managing);
+ checkAsyncQueueMBean(puneSender, true);
+ checkAsyncQueueMBean(managing, true);
DistributedMember puneMember =
(DistributedMember) puneSender.invoke(() -> WANManagementDUnitTest.getMember());
- checkProxyAsyncQueue(managing, puneMember);
+ checkProxyAsyncQueue(managing, puneMember, true);
}
+ @Test
+ public void testCreateDestroyAsyncEventQueue() throws Exception {
+ VM memberVM = getManagedNodeList().get(2);
+ VM managerVm = getManagingNode();
+ VM locatorVm = Host.getLocator();
+
+ int locatorPort = (Integer) locatorVm.invoke(() -> WANManagementDUnitTest.getLocatorPort());
+
+ memberVM.invoke(() -> WANTestBase.createCache(locatorPort));
+ managerVm.invoke(() -> WANTestBase.createManagementCache(locatorPort));
+ startManagingNode(managerVm);
+
+ // Create AsyncEventQueue
+ String aeqId = "pn";
+ memberVM.invoke(
+ () -> WANTestBase.createAsyncEventQueue(aeqId, false, 100, 100, false, false, null, false));
+ managerVm.invoke(
+ () -> WANTestBase.createAsyncEventQueue(aeqId, false, 100, 100, false, false, null, false));
+
+ // Verify AsyncEventQueueMXBean exists
+ checkAsyncQueueMBean(memberVM, true);
+ checkAsyncQueueMBean(managerVm, true);
+ DistributedMember member = memberVM.invoke(() -> WANManagementDUnitTest.getMember());
+ checkProxyAsyncQueue(managerVm, member, true);
+
+ // Destroy AsyncEventQueue
+ memberVM.invoke(() -> WANTestBase.destroyAsyncEventQueue(aeqId));
+ managerVm.invoke(() -> WANTestBase.destroyAsyncEventQueue(aeqId));
+
+ // Verify AsyncEventQueueMXBean no longer exists
+ checkAsyncQueueMBean(memberVM, false);
+ checkAsyncQueueMBean(managerVm, false);
+ checkProxyAsyncQueue(managerVm, member, false);
+ }
@SuppressWarnings("serial")
protected void checkSenderNavigationAPIS(final VM vm, final DistributedMember senderMember) {
@@ -448,15 +479,18 @@ public class WANManagementDUnitTest extends ManagementTestBase {
* @param vm reference to VM
*/
@SuppressWarnings("serial")
- protected void checkAsyncQueueMBean(final VM vm) {
+ protected void checkAsyncQueueMBean(final VM vm, final boolean shouldExist) {
SerializableRunnable checkAsyncQueueMBean =
new SerializableRunnable("Check Async Queue MBean") {
public void run() {
Cache cache = GemFireCacheImpl.getInstance();
ManagementService service = ManagementService.getManagementService(cache);
AsyncEventQueueMXBean bean = service.getLocalAsyncEventQueueMXBean("pn");
- assertNotNull(bean);
- // Already in started State
+ if (shouldExist) {
+ assertNotNull(bean);
+ } else {
+ assertNull(bean);
+ }
}
};
vm.invoke(checkAsyncQueueMBean);
@@ -468,28 +502,36 @@ public class WANManagementDUnitTest extends ManagementTestBase {
* @param vm reference to VM
*/
@SuppressWarnings("serial")
- protected void checkProxyAsyncQueue(final VM vm, final DistributedMember senderMember) {
+ protected void checkProxyAsyncQueue(final VM vm, final DistributedMember senderMember,
+ final boolean shouldExist) {
SerializableRunnable checkProxyAsyncQueue =
new SerializableRunnable("Check Proxy Async Queue") {
public void run() {
Cache cache = GemFireCacheImpl.getInstance();
- ManagementService service = ManagementService.getManagementService(cache);
- AsyncEventQueueMXBean bean = null;
- try {
- bean = MBeanUtil.getAsyncEventQueueMBeanProxy(senderMember, "pn");
- } catch (Exception e) {
- fail("Could not obtain Sender Proxy in desired time " + e);
- }
- assertNotNull(bean);
+ SystemManagementService service =
+ (SystemManagementService) ManagementService.getManagementService(cache);
final ObjectName queueMBeanName =
service.getAsyncEventQueueMBeanName(senderMember, "pn");
-
- try {
- MBeanUtil.printBeanDetails(queueMBeanName);
- } catch (Exception e) {
- fail("Error while Printing Bean Details " + e);
+ AsyncEventQueueMXBean bean = null;
+ if (shouldExist) {
+ // Verify the MBean proxy exists
+ try {
+ bean = MBeanUtil.getAsyncEventQueueMBeanProxy(senderMember, "pn");
+ } catch (Exception e) {
+ fail("Could not obtain Sender Proxy in desired time " + e);
+ }
+ assertNotNull(bean);
+
+ try {
+ MBeanUtil.printBeanDetails(queueMBeanName);
+ } catch (Exception e) {
+ fail("Error while Printing Bean Details " + e);
+ }
+ } else {
+ // Verify the MBean proxy doesn't exist
+ bean = service.getMBeanProxy(queueMBeanName, AsyncEventQueueMXBean.class);
+ assertNull(bean);
}
-
}
};
vm.invoke(checkProxyAsyncQueue);