You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2017/03/09 20:48:47 UTC

[08/50] [abbrv] geode git commit: GEODE-2568: The JMX MBean is now removed when its AsyncEventQueue is removed

GEODE-2568: The JMX MBean is now removed when its AsyncEventQueue is removed


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/e5121aba
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/e5121aba
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/e5121aba

Branch: refs/heads/feature/GEODE-1969
Commit: e5121abad3b4c11620b0be037ada4484e7248875
Parents: 4e61799
Author: Barry Oglesby <bo...@pivotal.io>
Authored: Wed Mar 1 13:20:31 2017 -0800
Committer: Barry Oglesby <bo...@pivotal.io>
Committed: Fri Mar 3 15:08:53 2017 -0800

----------------------------------------------------------------------
 .../distributed/internal/ResourceEvent.java     |  1 +
 .../geode/internal/cache/GemFireCacheImpl.java  |  1 +
 .../geode/management/JMXNotificationType.java   |  8 ++
 .../internal/ManagementConstants.java           |  2 +
 .../internal/beans/AsyncEventQueueMBean.java    |  3 +
 .../beans/AsyncEventQueueMBeanBridge.java       |  3 +
 .../internal/beans/ManagementAdapter.java       | 36 ++++++++
 .../internal/beans/ManagementListener.java      |  4 +
 .../geode/internal/cache/wan/WANTestBase.java   |  6 ++
 .../management/WANManagementDUnitTest.java      | 92 ++++++++++++++------
 10 files changed, 131 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/e5121aba/geode-core/src/main/java/org/apache/geode/distributed/internal/ResourceEvent.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ResourceEvent.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ResourceEvent.java
index acfb157..b004113 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ResourceEvent.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ResourceEvent.java
@@ -34,6 +34,7 @@ public enum ResourceEvent {
   MANAGER_STOP,
   LOCATOR_START,
   ASYNCEVENTQUEUE_CREATE,
+  ASYNCEVENTQUEUE_REMOVE,
   SYSTEM_ALERT,
   CACHE_SERVER_START,
   CACHE_SERVER_STOP,

http://git-wip-us.apache.org/repos/asf/geode/blob/e5121aba/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index 66f1a4a..fcf7a2a 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -4131,6 +4131,7 @@ public class GemFireCacheImpl
       this.allAsyncEventQueues.remove(asyncQueue);
       this.allVisibleAsyncEventQueues.remove(asyncQueue);
     }
+    system.handleResourceEvent(ResourceEvent.ASYNCEVENTQUEUE_REMOVE, asyncQueue);
   }
 
   /* Cache API - get the conflict resolver for WAN */

http://git-wip-us.apache.org/repos/asf/geode/blob/e5121aba/geode-core/src/main/java/org/apache/geode/management/JMXNotificationType.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/management/JMXNotificationType.java b/geode-core/src/main/java/org/apache/geode/management/JMXNotificationType.java
index 3fb8ba2..ba3daeb 100644
--- a/geode-core/src/main/java/org/apache/geode/management/JMXNotificationType.java
+++ b/geode-core/src/main/java/org/apache/geode/management/JMXNotificationType.java
@@ -167,6 +167,14 @@ public interface JMXNotificationType {
       DistributionConfig.GEMFIRE_PREFIX + "distributedsystem.asycn.event.queue.created";
 
   /**
+   * Notification type which indicates that an async queue has been closed. <BR>
+   * The value of this type string is
+   * <CODE>gemfire.distributedsystem.async.event.queue.closed</CODE>.
+   */
+  public static final String ASYNC_EVENT_QUEUE_CLOSED =
+      DistributionConfig.GEMFIRE_PREFIX + "distributedsystem.async.event.queue.closed";
+
+  /**
    * Notification type which indicates a GemFire system generated alert <BR>
    * The value of this type string is <CODE>system.alert</CODE>.
    */

http://git-wip-us.apache.org/repos/asf/geode/blob/e5121aba/geode-core/src/main/java/org/apache/geode/management/internal/ManagementConstants.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/ManagementConstants.java b/geode-core/src/main/java/org/apache/geode/management/internal/ManagementConstants.java
index d5bfedb..151aa9d 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/ManagementConstants.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/ManagementConstants.java
@@ -187,6 +187,8 @@ public interface ManagementConstants {
 
   public static final String ASYNC_EVENT_QUEUE_CREATED_PREFIX =
       "Async Event Queue is Created  in the VM ";
+  public static final String ASYNC_EVENT_QUEUE_CLOSED_PREFIX =
+      "Async Event Queue is Closed in the VM ";
 
   public static final String CACHE_SERVICE_CREATED_PREFIX = "Cache Service Created With Name ";
 

http://git-wip-us.apache.org/repos/asf/geode/blob/e5121aba/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBean.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBean.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBean.java
index 2f2757f..7cea8e9 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBean.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBean.java
@@ -107,4 +107,7 @@ public class AsyncEventQueueMBean extends NotificationBroadcasterSupport
     return bridge.getEventQueueSize();
   }
 
+  public void stopMonitor() {
+    bridge.stopMonitor();
+  }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/e5121aba/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBeanBridge.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBeanBridge.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBeanBridge.java
index e57bc55..83cfb88 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBeanBridge.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBeanBridge.java
@@ -114,4 +114,7 @@ public class AsyncEventQueueMBeanBridge {
     }
   }
 
+  public void stopMonitor() {
+    monitor.stopListener();
+  }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/e5121aba/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java
index f601a9a..183a5a8 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java
@@ -592,6 +592,42 @@ public class ManagementAdapter {
   }
 
   /**
+   * Handles AsyncEventQueue Removal
+   *
+   * @param queue The AsyncEventQueue being removed
+   */
+  public void handleAsyncEventQueueRemoval(AsyncEventQueue queue) throws ManagementException {
+    if (!isServiceInitialised("handleAsyncEventQueueRemoval")) {
+      return;
+    }
+
+    ObjectName asycnEventQueueMBeanName = MBeanJMXAdapter.getAsycnEventQueueMBeanName(
+        cacheImpl.getDistributedSystem().getDistributedMember(), queue.getId());
+    AsyncEventQueueMBean bean = null;
+    try {
+      bean = (AsyncEventQueueMBean) service.getLocalAsyncEventQueueMXBean(queue.getId());
+      if (bean == null) {
+        return;
+      }
+    } catch (ManagementException e) {
+      // If no bean found its a NO-OP
+      if (logger.isDebugEnabled()) {
+        logger.debug(e.getMessage(), e);
+      }
+      return;
+    }
+
+    bean.stopMonitor();
+
+    service.unregisterMBean(asycnEventQueueMBeanName);
+
+    Notification notification = new Notification(JMXNotificationType.ASYNC_EVENT_QUEUE_CLOSED,
+        memberSource, SequenceNumber.next(), System.currentTimeMillis(),
+        ManagementConstants.ASYNC_EVENT_QUEUE_CLOSED_PREFIX + queue.getId());
+    memberLevelNotifEmitter.sendNotification(notification);
+  }
+
+  /**
    * Sends the alert with the Object source as member. This notification will get filtered out for
    * particular alert level
    * 

http://git-wip-us.apache.org/repos/asf/geode/blob/e5121aba/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java
index 12392b5..d841122 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java
@@ -185,6 +185,10 @@ public class ManagementListener implements ResourceEventsListener {
         AsyncEventQueue queue = (AsyncEventQueue) resource;
         adapter.handleAsyncEventQueueCreation(queue);
         break;
+      case ASYNCEVENTQUEUE_REMOVE:
+        AsyncEventQueue removedQueue = (AsyncEventQueue) resource;
+        adapter.handleAsyncEventQueueRemoval(removedQueue);
+        break;
       case SYSTEM_ALERT:
         AlertDetails details = (AlertDetails) resource;
         adapter.handleSystemNotification(details);

http://git-wip-us.apache.org/repos/asf/geode/blob/e5121aba/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index f0704e8..c8de7dc 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -3470,6 +3470,12 @@ public class WANTestBase extends JUnit4DistributedTestCase {
     }
   }
 
+  public static void destroyAsyncEventQueue(String id) {
+    AsyncEventQueueImpl aeq = (AsyncEventQueueImpl) cache.getAsyncEventQueue(id);
+    assertNotNull(aeq);
+    aeq.destroy();
+  }
+
   protected static void verifyListenerEvents(final long expectedNumEvents) {
     Awaitility.await().atMost(60, TimeUnit.SECONDS)
         .until(() -> listener1.getNumEvents() == expectedNumEvents);

http://git-wip-us.apache.org/repos/asf/geode/blob/e5121aba/geode-wan/src/test/java/org/apache/geode/management/WANManagementDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/management/WANManagementDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/management/WANManagementDUnitTest.java
index 03c0d62..880648d 100644
--- a/geode-wan/src/test/java/org/apache/geode/management/WANManagementDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/management/WANManagementDUnitTest.java
@@ -14,21 +14,19 @@
  */
 package org.apache.geode.management;
 
+import org.apache.geode.management.internal.SystemManagementService;
 import org.junit.experimental.categories.Category;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
 
 import org.awaitility.Awaitility;
-import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
-import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
 import org.apache.geode.test.junit.categories.DistributedTest;
 import org.apache.geode.test.junit.categories.FlakyTest;
 
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import org.apache.geode.cache.Cache;
@@ -36,7 +34,6 @@ import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.Locator;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.wan.WANTestBase;
-import org.apache.geode.management.internal.MBeanJMXAdapter;
 import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.SerializableRunnable;
 import org.apache.geode.test.dunit.VM;
@@ -215,16 +212,50 @@ public class WANManagementDUnitTest extends ManagementTestBase {
         1, 100, false));
     nyReceiver.invoke(() -> WANTestBase.createReceiver());
 
-    checkAsyncQueueMBean(puneSender);
-    checkAsyncQueueMBean(managing);
+    checkAsyncQueueMBean(puneSender, true);
+    checkAsyncQueueMBean(managing, true);
 
     DistributedMember puneMember =
         (DistributedMember) puneSender.invoke(() -> WANManagementDUnitTest.getMember());
 
-    checkProxyAsyncQueue(managing, puneMember);
+    checkProxyAsyncQueue(managing, puneMember, true);
 
   }
 
+  @Test
+  public void testCreateDestroyAsyncEventQueue() throws Exception {
+    VM memberVM = getManagedNodeList().get(2);
+    VM managerVm = getManagingNode();
+    VM locatorVm = Host.getLocator();
+
+    int locatorPort = (Integer) locatorVm.invoke(() -> WANManagementDUnitTest.getLocatorPort());
+
+    memberVM.invoke(() -> WANTestBase.createCache(locatorPort));
+    managerVm.invoke(() -> WANTestBase.createManagementCache(locatorPort));
+    startManagingNode(managerVm);
+
+    // Create AsyncEventQueue
+    String aeqId = "pn";
+    memberVM.invoke(
+        () -> WANTestBase.createAsyncEventQueue(aeqId, false, 100, 100, false, false, null, false));
+    managerVm.invoke(
+        () -> WANTestBase.createAsyncEventQueue(aeqId, false, 100, 100, false, false, null, false));
+
+    // Verify AsyncEventQueueMXBean exists
+    checkAsyncQueueMBean(memberVM, true);
+    checkAsyncQueueMBean(managerVm, true);
+    DistributedMember member = memberVM.invoke(() -> WANManagementDUnitTest.getMember());
+    checkProxyAsyncQueue(managerVm, member, true);
+
+    // Destroy AsyncEventQueue
+    memberVM.invoke(() -> WANTestBase.destroyAsyncEventQueue(aeqId));
+    managerVm.invoke(() -> WANTestBase.destroyAsyncEventQueue(aeqId));
+
+    // Verify AsyncEventQueueMXBean no longer exists
+    checkAsyncQueueMBean(memberVM, false);
+    checkAsyncQueueMBean(managerVm, false);
+    checkProxyAsyncQueue(managerVm, member, false);
+  }
 
   @SuppressWarnings("serial")
   protected void checkSenderNavigationAPIS(final VM vm, final DistributedMember senderMember) {
@@ -448,15 +479,18 @@ public class WANManagementDUnitTest extends ManagementTestBase {
    * @param vm reference to VM
    */
   @SuppressWarnings("serial")
-  protected void checkAsyncQueueMBean(final VM vm) {
+  protected void checkAsyncQueueMBean(final VM vm, final boolean shouldExist) {
     SerializableRunnable checkAsyncQueueMBean =
         new SerializableRunnable("Check Async Queue MBean") {
           public void run() {
             Cache cache = GemFireCacheImpl.getInstance();
             ManagementService service = ManagementService.getManagementService(cache);
             AsyncEventQueueMXBean bean = service.getLocalAsyncEventQueueMXBean("pn");
-            assertNotNull(bean);
-            // Already in started State
+            if (shouldExist) {
+              assertNotNull(bean);
+            } else {
+              assertNull(bean);
+            }
           }
         };
     vm.invoke(checkAsyncQueueMBean);
@@ -468,28 +502,36 @@ public class WANManagementDUnitTest extends ManagementTestBase {
    * @param vm reference to VM
    */
   @SuppressWarnings("serial")
-  protected void checkProxyAsyncQueue(final VM vm, final DistributedMember senderMember) {
+  protected void checkProxyAsyncQueue(final VM vm, final DistributedMember senderMember,
+      final boolean shouldExist) {
     SerializableRunnable checkProxyAsyncQueue =
         new SerializableRunnable("Check Proxy Async Queue") {
           public void run() {
             Cache cache = GemFireCacheImpl.getInstance();
-            ManagementService service = ManagementService.getManagementService(cache);
-            AsyncEventQueueMXBean bean = null;
-            try {
-              bean = MBeanUtil.getAsyncEventQueueMBeanProxy(senderMember, "pn");
-            } catch (Exception e) {
-              fail("Could not obtain Sender Proxy in desired time " + e);
-            }
-            assertNotNull(bean);
+            SystemManagementService service =
+                (SystemManagementService) ManagementService.getManagementService(cache);
             final ObjectName queueMBeanName =
                 service.getAsyncEventQueueMBeanName(senderMember, "pn");
-
-            try {
-              MBeanUtil.printBeanDetails(queueMBeanName);
-            } catch (Exception e) {
-              fail("Error while Printing Bean Details " + e);
+            AsyncEventQueueMXBean bean = null;
+            if (shouldExist) {
+              // Verify the MBean proxy exists
+              try {
+                bean = MBeanUtil.getAsyncEventQueueMBeanProxy(senderMember, "pn");
+              } catch (Exception e) {
+                fail("Could not obtain Sender Proxy in desired time " + e);
+              }
+              assertNotNull(bean);
+
+              try {
+                MBeanUtil.printBeanDetails(queueMBeanName);
+              } catch (Exception e) {
+                fail("Error while Printing Bean Details " + e);
+              }
+            } else {
+              // Verify the MBean proxy doesn't exist
+              bean = service.getMBeanProxy(queueMBeanName, AsyncEventQueueMXBean.class);
+              assertNull(bean);
             }
-
           }
         };
     vm.invoke(checkProxyAsyncQueue);