You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/12/14 17:58:03 UTC

[48/50] [abbrv] geode git commit: Convert from ManagementTestCase to ManagementTestRule

http://git-wip-us.apache.org/repos/asf/geode/blob/914cbfdc/geode-core/src/test/java/org/apache/geode/management/DiskManagementDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/management/DiskManagementDUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/DiskManagementDUnitTest.java
index bcfadde..50533d3 100644
--- a/geode-core/src/test/java/org/apache/geode/management/DiskManagementDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/DiskManagementDUnitTest.java
@@ -14,24 +14,25 @@
  */
 package org.apache.geode.management;
 
-import org.junit.experimental.categories.Category;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
-import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
-import org.apache.geode.test.junit.categories.DistributedTest;
+import static java.util.concurrent.TimeUnit.*;
+import static org.assertj.core.api.Assertions.*;
 
 import java.io.File;
-import java.util.Arrays;
-import java.util.List;
+import java.io.Serializable;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
 
 import javax.management.ObjectName;
 
-import org.apache.geode.LogWriter;
+import com.jayway.awaitility.Awaitility;
+import com.jayway.awaitility.core.ConditionFactory;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.DataPolicy;
 import org.apache.geode.cache.DiskStore;
@@ -40,674 +41,366 @@ import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionFactory;
 import org.apache.geode.cache.Scope;
 import org.apache.geode.distributed.DistributedMember;
-import org.apache.geode.internal.cache.DiskRegion;
 import org.apache.geode.internal.cache.DiskRegionStats;
 import org.apache.geode.internal.cache.DistributedRegion;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
-import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.persistence.PersistentMemberID;
 import org.apache.geode.internal.cache.persistence.PersistentMemberManager;
-import org.apache.geode.management.internal.MBeanJMXAdapter;
+import org.apache.geode.internal.process.ProcessUtils;
+import org.apache.geode.management.internal.SystemManagementService;
 import org.apache.geode.test.dunit.AsyncInvocation;
-import org.apache.geode.test.dunit.SerializableCallable;
-import org.apache.geode.test.dunit.SerializableRunnable;
 import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
 
 /**
  * Test cases to cover all test cases which pertains to disk from Management layer
- * 
- * 
  */
 @Category(DistributedTest.class)
-public class DiskManagementDUnitTest extends ManagementTestBase {
-
-  /**
-   * 
-   */
-  private static final long serialVersionUID = 1L;
-
-  // This must be bigger than the dunit ack-wait-threshold for the revoke
-  // tests. The command line is setting the ack-wait-threshold to be
-  // 60 seconds.
-  private static final int MAX_WAIT = 70 * 1000;
-
-  boolean testFailed = false;
+@SuppressWarnings({"serial", "unused"})
+public class DiskManagementDUnitTest implements Serializable {
 
-  String failureCause = "";
-  static final String REGION_NAME = "region";
+  private static final String REGION_NAME =
+      DiskManagementDUnitTest.class.getSimpleName() + "_region";
 
   private File diskDir;
 
-  protected static LogWriter logWriter;
+  @Manager
+  private VM managerVM;
 
-  public DiskManagementDUnitTest() throws Exception {
-    super();
+  @Member
+  private VM[] memberVMs;
 
-    diskDir = new File("diskDir-" + getName()).getAbsoluteFile();
-    org.apache.geode.internal.FileUtil.delete(diskDir);
-    diskDir.mkdir();
-    diskDir.deleteOnExit();
-  }
+  @Rule
+  public ManagementTestRule managementTestRule = ManagementTestRule.builder().start(true).build();
 
-  @Override
-  protected final void postSetUpManagementTestBase() throws Exception {
-    failureCause = "";
-    testFailed = false;
-  }
+  @Rule
+  public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder();
 
-  @Override
-  protected final void postTearDownManagementTestBase() throws Exception {
-    org.apache.geode.internal.FileUtil.delete(diskDir);
+  @Before
+  public void before() throws Exception {
+    this.diskDir = this.temporaryFolder.newFolder("diskDir");
   }
 
   /**
-   * Tests Disk Compaction from a MemberMbean which is at cache level. All the disks which belong to
-   * the cache should be compacted.
-   * 
-   * @throws Exception
+   * Tests Disk Compaction from a MemberMXBean which is at cache level. All the disks which belong
+   * to the cache should be compacted.
    */
-
   @Test
-  public void testDiskCompact() throws Throwable {
-    initManagement(false);
-    for (VM vm : getManagedNodeList()) {
-      createPersistentRegion(vm);
-      makeDiskCompactable(vm);
+  public void testDiskCompact() throws Exception {
+    for (VM memberVM : this.memberVMs) {
+      createPersistentRegion(memberVM);
+      makeDiskCompactable(memberVM);
     }
 
-    for (VM vm : getManagedNodeList()) {
-      compactAllDiskStores(vm);
+    for (VM memberVM : this.memberVMs) {
+      compactAllDiskStores(memberVM);
     }
-
   }
 
   /**
-   * Tests Disk Compaction from a MemberMbean which is at cache level. All the disks which belong to
-   * the cache should be compacted.
-   * 
-   * @throws Exception
+   * Tests Disk Compaction from a MemberMXBean which is at cache level. All the disks which belong
+   * to the cache should be compacted.
    */
-
   @Test
-  public void testDiskCompactRemote() throws Throwable {
-
-    initManagement(false);
-    for (VM vm : getManagedNodeList()) {
-      createPersistentRegion(vm);
-      makeDiskCompactable(vm);
+  public void testDiskCompactRemote() throws Exception {
+    for (VM memberVM : this.memberVMs) {
+      createPersistentRegion(memberVM);
+      makeDiskCompactable(memberVM);
     }
-    compactDiskStoresRemote(managingNode);
 
+    compactDiskStoresRemote(this.managerVM, this.memberVMs.length);
   }
 
   /**
    * Tests various operations defined on DiskStore Mbean
-   * 
-   * @throws Exception
    */
-
   @Test
-  public void testDiskOps() throws Throwable {
-
-    initManagement(false);
-    for (VM vm : getManagedNodeList()) {
-      createPersistentRegion(vm);
-      makeDiskCompactable(vm);
-      invokeFlush(vm);
-      invokeForceRoll(vm);
-      invokeForceCompaction(vm);
+  public void testDiskOps() throws Exception {
+    for (VM memberVM : this.memberVMs) {
+      createPersistentRegion(memberVM);
+      makeDiskCompactable(memberVM);
+      invokeFlush(memberVM);
+      invokeForceRoll(memberVM);
+      invokeForceCompaction(memberVM);
     }
-
   }
 
   @Test
-  public void testDiskBackupAllMembers() throws Throwable {
-    initManagement(false);
-    for (VM vm : getManagedNodeList()) {
-      createPersistentRegion(vm);
-      makeDiskCompactable(vm);
-
+  public void testDiskBackupAllMembers() throws Exception {
+    for (VM memberVM : this.memberVMs) {
+      createPersistentRegion(memberVM);
+      makeDiskCompactable(memberVM);
     }
-    backupAllMembers(managingNode);
+
+    backupAllMembers(this.managerVM, this.memberVMs.length);
   }
 
   /**
-   * Checks the test case of missing disks and revoking them through MemberMbean interfaces
-   * 
-   * @throws Throwable
+   * Checks the test case of missing disks and revoking them through MemberMXBean interfaces
    */
-  @SuppressWarnings("serial")
   @Test
-  public void testMissingMembers() throws Throwable {
+  public void testMissingMembers() throws Exception {
+    VM memberVM1 = this.memberVMs[0];
+    VM memberVM2 = this.memberVMs[1];
 
-    initManagement(false);
-    VM vm0 = getManagedNodeList().get(0);
-    VM vm1 = getManagedNodeList().get(1);
-    VM vm2 = getManagedNodeList().get(2);
+    createPersistentRegion(memberVM1);
+    createPersistentRegion(memberVM2);
 
-    org.apache.geode.test.dunit.LogWriterUtils.getLogWriter().info("Creating region in VM0");
-    createPersistentRegion(vm0);
-    org.apache.geode.test.dunit.LogWriterUtils.getLogWriter().info("Creating region in VM1");
-    createPersistentRegion(vm1);
+    putAnEntry(memberVM1);
 
-    putAnEntry(vm0);
+    this.managerVM.invoke("checkForMissingDiskStores", () -> {
+      ManagementService service = this.managementTestRule.getManagementService();
+      DistributedSystemMXBean distributedSystemMXBean = service.getDistributedSystemMXBean();
+      PersistentMemberDetails[] missingDiskStores = distributedSystemMXBean.listMissingDiskStores();
 
+      assertThat(missingDiskStores).isNull();
+    });
 
-    managingNode.invoke(new SerializableRunnable("Check for waiting regions") {
+    closeRegion(memberVM1);
 
-      public void run() {
-        Cache cache = getCache();
-        ManagementService service = getManagementService();
-        DistributedSystemMXBean bean = service.getDistributedSystemMXBean();
-        PersistentMemberDetails[] missingDiskStores = bean.listMissingDiskStores();
+    updateTheEntry(memberVM2, "C");
 
-        assertNull(missingDiskStores);
-      }
-    });
+    closeRegion(memberVM2);
 
-    org.apache.geode.test.dunit.LogWriterUtils.getLogWriter().info("closing region in vm0");
-    closeRegion(vm0);
-
-    updateTheEntry(vm1);
-
-    org.apache.geode.test.dunit.LogWriterUtils.getLogWriter().info("closing region in vm1");
-    closeRegion(vm1);
-    AsyncInvocation future = createPersistentRegionAsync(vm0);
-    waitForBlockedInitialization(vm0);
-    assertTrue(future.isAlive());
-
-    managingNode.invoke(new SerializableRunnable("Revoke the member") {
-
-      public void run() {
-        Cache cache = getCache();
-        GemFireCacheImpl cacheImpl = (GemFireCacheImpl) cache;
-        ManagementService service = getManagementService();
-        DistributedSystemMXBean bean = service.getDistributedSystemMXBean();
-        PersistentMemberDetails[] missingDiskStores = bean.listMissingDiskStores();
-        org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()
-            .info("waiting members=" + missingDiskStores);
-        assertNotNull(missingDiskStores);
-        assertEquals(1, missingDiskStores.length);
-
-        for (PersistentMemberDetails id : missingDiskStores) {
-          org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()
-              .info("Missing DiskStoreID is =" + id.getDiskStoreId());
-          org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()
-              .info("Missing Host is =" + id.getHost());
-          org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()
-              .info("Missing Directory is =" + id.getDirectory());
-
-          try {
-            bean.revokeMissingDiskStores(id.getDiskStoreId());
-          } catch (Exception e) {
-            fail("revokeMissingDiskStores failed with exception " + e);
-          }
-        }
-      }
-    });
+    AsyncInvocation creatingPersistentRegionAsync = createPersistentRegionAsync(memberVM1);
 
-    future.join(MAX_WAIT);
-    if (future.isAlive()) {
-      fail("Region not created within" + MAX_WAIT);
-    }
-    if (future.exceptionOccurred()) {
-      throw new Exception(future.getException());
-    }
-    checkForRecoveryStat(vm0, true);
-    // Check to make sure we recovered the old
-    // value of the entry.
-    SerializableRunnable checkForEntry = new SerializableRunnable("check for the entry") {
-
-      public void run() {
-        Cache cache = getCache();
-        Region region = cache.getRegion(REGION_NAME);
-        assertEquals("B", region.get("A"));
-      }
-    };
-    vm0.invoke(checkForEntry);
+    memberVM1.invoke(() -> await().until(() -> {
+      GemFireCacheImpl cache = (GemFireCacheImpl) this.managementTestRule.getCache();
+      PersistentMemberManager persistentMemberManager = cache.getPersistentMemberManager();
+      Map<String, Set<PersistentMemberID>> regions = persistentMemberManager.getWaitingRegions();
+      return !regions.isEmpty();
+    }));
 
-  }
+    assertThat(creatingPersistentRegionAsync.isAlive()).isTrue();
 
-  protected void checkNavigation(final VM vm, final DistributedMember diskMember,
-      final String diskStoreName) {
-    SerializableRunnable checkNavigation = new SerializableRunnable("Check Navigation") {
-      public void run() {
+    this.managerVM.invoke("revokeMissingDiskStore", () -> {
+      ManagementService service = this.managementTestRule.getManagementService();
+      DistributedSystemMXBean bean = service.getDistributedSystemMXBean();
+      PersistentMemberDetails[] missingDiskStores = bean.listMissingDiskStores();
 
-        final ManagementService service = getManagementService();
+      assertThat(missingDiskStores).isNotNull().hasSize(1);
 
-        DistributedSystemMXBean disMBean = service.getDistributedSystemMXBean();
-        try {
-          ObjectName expected =
-              MBeanJMXAdapter.getDiskStoreMBeanName(diskMember.getId(), diskStoreName);
-          ObjectName actual = disMBean.fetchDiskStoreObjectName(diskMember.getId(), diskStoreName);
-          assertEquals(expected, actual);
-        } catch (Exception e) {
-          fail("Disk Store Navigation Failed " + e);
-        }
+      assertThat(bean.revokeMissingDiskStores(missingDiskStores[0].getDiskStoreId())).isTrue();
+    });
 
+    await(creatingPersistentRegionAsync);
 
-      }
-    };
-    vm.invoke(checkNavigation);
-  }
+    verifyRecoveryStats(memberVM1, true);
 
-  /**
-   * get Distributed member for a given vm
-   */
-  @SuppressWarnings("serial")
-  protected static DistributedMember getMember() throws Exception {
-    GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
-    return cache.getDistributedSystem().getDistributedMember();
+    // Check to make sure we recovered the old value of the entry.
+    memberVM1.invoke("check for the entry", () -> {
+      Cache cache = this.managementTestRule.getCache();
+      Region region = cache.getRegion(REGION_NAME);
+      assertThat(region.get("A")).isEqualTo("B");
+    });
   }
 
   /**
    * Invokes flush on the given disk store by MBean interface
-   * 
-   * @param vm reference to VM
    */
-  @SuppressWarnings("serial")
-  public void invokeFlush(final VM vm) {
-    SerializableRunnable invokeFlush = new SerializableRunnable("Invoke Flush On Disk") {
-      public void run() {
-        Cache cache = getCache();
-        DiskStoreFactory dsf = cache.createDiskStoreFactory();
-        String name = "testFlush_" + vm.getPid();
-        DiskStore ds = dsf.create(name);
-
-        ManagementService service = getManagementService();
-        DiskStoreMXBean bean = service.getLocalDiskStoreMBean(name);
-        assertNotNull(bean);
-        bean.flush();
-      }
-    };
-    vm.invoke(invokeFlush);
+  private void invokeFlush(final VM memberVM) {
+    memberVM.invoke("invokeFlush", () -> {
+      Cache cache = this.managementTestRule.getCache();
+      DiskStoreFactory diskStoreFactory = cache.createDiskStoreFactory();
+      String name = "testFlush_" + ProcessUtils.identifyPid();
+      DiskStore diskStore = diskStoreFactory.create(name);
+
+      ManagementService service = this.managementTestRule.getManagementService();
+      DiskStoreMXBean diskStoreMXBean = service.getLocalDiskStoreMBean(name);
+      assertThat(diskStoreMXBean).isNotNull();
+      assertThat(diskStoreMXBean.getName()).isEqualTo(diskStore.getName());
+
+      diskStoreMXBean.flush();
+    });
   }
 
   /**
    * Invokes force roll on disk store by MBean interface
-   * 
-   * @param vm reference to VM
    */
-  @SuppressWarnings("serial")
-  public void invokeForceRoll(final VM vm) {
-    SerializableRunnable invokeForceRoll = new SerializableRunnable("Invoke Force Roll") {
-      public void run() {
-        Cache cache = getCache();
-        DiskStoreFactory dsf = cache.createDiskStoreFactory();
-        String name = "testForceRoll_" + vm.getPid();
-        DiskStore ds = dsf.create(name);
-        ManagementService service = getManagementService();
-        DiskStoreMXBean bean = service.getLocalDiskStoreMBean(name);
-        assertNotNull(bean);
-        bean.forceRoll();
-      }
-    };
-    vm.invoke(invokeForceRoll);
+  private void invokeForceRoll(final VM memberVM) {
+    memberVM.invoke("invokeForceRoll", () -> {
+      Cache cache = this.managementTestRule.getCache();
+      DiskStoreFactory diskStoreFactory = cache.createDiskStoreFactory();
+      String name = "testForceRoll_" + ProcessUtils.identifyPid();
+      DiskStore diskStore = diskStoreFactory.create(name);
+
+      ManagementService service = this.managementTestRule.getManagementService();
+      DiskStoreMXBean diskStoreMXBean = service.getLocalDiskStoreMBean(name);
+      assertThat(diskStoreMXBean).isNotNull();
+      assertThat(diskStoreMXBean.getName()).isEqualTo(diskStore.getName());
+
+      diskStoreMXBean.forceRoll();
+    });
   }
 
   /**
    * Invokes force compaction on disk store by MBean interface
-   * 
-   * @param vm reference to VM
    */
-  @SuppressWarnings("serial")
-  public void invokeForceCompaction(final VM vm) {
-    SerializableRunnable invokeForceCompaction =
-        new SerializableRunnable("Invoke Force Compaction") {
-          public void run() {
-            Cache cache = getCache();
-            DiskStoreFactory dsf = cache.createDiskStoreFactory();
-            dsf.setAllowForceCompaction(true);
-            String name = "testForceCompaction_" + vm.getPid();
-            DiskStore ds = dsf.create(name);
-            ManagementService service = getManagementService();
-            DiskStoreMXBean bean = service.getLocalDiskStoreMBean(name);
-            assertNotNull(bean);
-            assertEquals(false, bean.forceCompaction());
-          }
-        };
-    vm.invoke(invokeForceCompaction);
+  private void invokeForceCompaction(final VM memberVM) {
+    memberVM.invoke("invokeForceCompaction", () -> {
+      Cache cache = this.managementTestRule.getCache();
+      DiskStoreFactory dsf = cache.createDiskStoreFactory();
+      dsf.setAllowForceCompaction(true);
+      String name = "testForceCompaction_" + ProcessUtils.identifyPid();
+      DiskStore diskStore = dsf.create(name);
+
+      ManagementService service = this.managementTestRule.getManagementService();
+      DiskStoreMXBean diskStoreMXBean = service.getLocalDiskStoreMBean(name);
+      assertThat(diskStoreMXBean).isNotNull();
+      assertThat(diskStoreMXBean.getName()).isEqualTo(diskStore.getName());
+
+      assertThat(diskStoreMXBean.forceCompaction()).isFalse();
+    });
   }
 
   /**
    * Makes the disk compactable by adding and deleting some entries
-   * 
-   * @throws Exception
    */
-  @SuppressWarnings("serial")
-  public void makeDiskCompactable(VM vm1) throws Exception {
-    vm1.invoke(new SerializableRunnable("Make The Disk Compactable") {
-
-      public void run() {
-        Cache cache = getCache();
-        Region region = cache.getRegion(REGION_NAME);
-        DiskRegion dr = ((LocalRegion) region).getDiskRegion();
-        org.apache.geode.test.dunit.LogWriterUtils.getLogWriter().info("putting key1");
-        region.put("key1", "value1");
-        org.apache.geode.test.dunit.LogWriterUtils.getLogWriter().info("putting key2");
-        region.put("key2", "value2");
-        org.apache.geode.test.dunit.LogWriterUtils.getLogWriter().info("removing key2");
-        region.remove("key2");
-        // now that it is compactable the following forceCompaction should
-        // go ahead and do a roll and compact it.
-      }
+  private void makeDiskCompactable(final VM memberVM) throws Exception {
+    memberVM.invoke("makeDiskCompactable", () -> {
+      Cache cache = this.managementTestRule.getCache();
+      Region region = cache.getRegion(REGION_NAME);
+      region.put("key1", "value1");
+      region.put("key2", "value2");
+      region.remove("key2");
+      // now that it is compactable the following forceCompaction should
+      // go ahead and do a roll and compact it.
     });
-
   }
 
-
-
   /**
    * Compacts all DiskStores belonging to a member
-   * 
-   * @param vm1 reference to VM
-   * @throws Exception
    */
-  @SuppressWarnings("serial")
-  public void compactAllDiskStores(VM vm1) throws Exception {
-
-    vm1.invoke(new SerializableCallable("Compact All Disk Stores") {
-
-      public Object call() throws Exception {
-        ManagementService service = getManagementService();
-        MemberMXBean memberBean = service.getMemberMXBean();
-        String[] compactedDiskStores = memberBean.compactAllDiskStores();
-
-        assertTrue(compactedDiskStores.length > 0);
-        for (int i = 0; i < compactedDiskStores.length; i++) {
-          org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()
-              .info("<ExpectedString> Compacted Store " + i + " " + compactedDiskStores[i]
-                  + "</ExpectedString> ");
-        }
-
-        return null;
-      }
+  private void compactAllDiskStores(final VM memberVM) throws Exception {
+    memberVM.invoke("compactAllDiskStores", () -> {
+      ManagementService service = this.managementTestRule.getManagementService();
+      MemberMXBean memberMXBean = service.getMemberMXBean();
+      String[] compactedDiskStores = memberMXBean.compactAllDiskStores();
+      assertThat(compactedDiskStores).hasSize(1);
     });
-
   }
 
   /**
    * Takes a back up of all the disk store in a given directory
    */
-  @SuppressWarnings("serial")
-  public void backupAllMembers(final VM managingVM) throws Exception {
+  private void backupAllMembers(final VM managerVM, final int memberCount) {
+    managerVM.invoke("backupAllMembers", () -> {
+      ManagementService service = this.managementTestRule.getManagementService();
+      DistributedSystemMXBean bean = service.getDistributedSystemMXBean();
+      File backupDir = this.temporaryFolder.newFolder("backupDir");
 
-    managingVM.invoke(new SerializableCallable("Backup All Disk Stores") {
+      DiskBackupStatus status = bean.backupAllMembers(backupDir.getAbsolutePath(), null);
 
-      public Object call() throws Exception {
-        ManagementService service = getManagementService();
-        DistributedSystemMXBean bean = service.getDistributedSystemMXBean();
-        DiskBackupStatus status =
-            bean.backupAllMembers(getBackupDir("test_backupAllMembers").getAbsolutePath(), null);
-
-        return null;
-      }
+      assertThat(status.getBackedUpDiskStores().keySet().size()).isEqualTo(memberCount);
+      assertThat(status.getOfflineDiskStores()).isEqualTo(null); // TODO: fix GEODE-1946
     });
-
   }
 
   /**
-   * Compact a disk store from Managing node
+   * Compact a disk store from managerVM VM
    */
-  @SuppressWarnings("serial")
-  public void compactDiskStoresRemote(VM managingVM) throws Exception {
-    {
-
-      managingVM.invoke(new SerializableCallable("Compact All Disk Stores Remote") {
-
-        public Object call() throws Exception {
-          GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
-          Set<DistributedMember> otherMemberSet =
-              cache.getDistributionManager().getOtherNormalDistributionManagerIds();
-
-          for (DistributedMember member : otherMemberSet) {
-            MemberMXBean bean = MBeanUtil.getMemberMbeanProxy(member);
-            String[] allDisks = bean.listDiskStores(true);
-            assertNotNull(allDisks);
-            List<String> listString = Arrays.asList(allDisks);
-            org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()
-                .info("<ExpectedString> Remote All Disk Stores Are  " + listString.toString()
-                    + "</ExpectedString> ");
-            String[] compactedDiskStores = bean.compactAllDiskStores();
-            assertTrue(compactedDiskStores.length > 0);
-            for (int i = 0; i < compactedDiskStores.length; i++) {
-              org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()
-                  .info("<ExpectedString> Remote Compacted Store " + i + " "
-                      + compactedDiskStores[i] + "</ExpectedString> ");
-            }
-
-          }
-          return null;
-        }
-      });
+  private void compactDiskStoresRemote(final VM managerVM, final int memberCount) {
+    managerVM.invoke("compactDiskStoresRemote", () -> {
+      Set<DistributedMember> otherMemberSet = this.managementTestRule.getOtherNormalMembers();// ((GemFireCacheImpl)cache).getDistributionManager().getOtherNormalDistributionManagerIds();
+      assertThat(otherMemberSet.size()).isEqualTo(memberCount);
 
-    }
+      SystemManagementService service = this.managementTestRule.getSystemManagementService();
 
-  }
-
-  /**
-   * Checks if a file with the given extension is present
-   * 
-   * @param fileExtension file extension
-   * @throws Exception
-   */
-  protected void checkIfContainsFileWithExt(String fileExtension) throws Exception {
-    File[] files = diskDir.listFiles();
-    for (int j = 0; j < files.length; j++) {
-      if (files[j].getAbsolutePath().endsWith(fileExtension)) {
-        fail("file \"" + files[j].getAbsolutePath() + "\" still exists");
-      }
-    }
+      for (DistributedMember member : otherMemberSet) {
+        MemberMXBean memberMXBean = awaitMemberMXBeanProxy(member);
 
-  }
+        String[] allDisks = memberMXBean.listDiskStores(true);
+        assertThat(allDisks).isNotNull().hasSize(1);
 
-  /**
-   * Update Entry
-   * 
-   * @param vm1 reference to VM
-   */
-  protected void updateTheEntry(VM vm1) {
-    updateTheEntry(vm1, "C");
-  }
-
-  /**
-   * Update an Entry
-   * 
-   * @param vm1 reference to VM
-   * @param value Value which is updated
-   */
-  @SuppressWarnings("serial")
-  protected void updateTheEntry(VM vm1, final String value) {
-    vm1.invoke(new SerializableRunnable("change the entry") {
-
-      public void run() {
-        Cache cache = getCache();
-        Region region = cache.getRegion(REGION_NAME);
-        region.put("A", value);
+        String[] compactedDiskStores = memberMXBean.compactAllDiskStores();
+        assertThat(compactedDiskStores).hasSize(1);
       }
     });
   }
 
-  /**
-   * Put an entry to region
-   * 
-   * @param vm0 reference to VM
-   */
-  @SuppressWarnings("serial")
-  protected void putAnEntry(VM vm0) {
-    vm0.invoke(new SerializableRunnable("Put an entry") {
-
-      public void run() {
-        Cache cache = getCache();
-        Region region = cache.getRegion(REGION_NAME);
-        region.put("A", "B");
-      }
+  private void updateTheEntry(final VM memberVM, final String value) {
+    memberVM.invoke("updateTheEntry", () -> {
+      Cache cache = this.managementTestRule.getCache();
+      Region region = cache.getRegion(REGION_NAME);
+      region.put("A", value);
     });
   }
 
-  /**
-   * Close the given region REGION_NAME
-   * 
-   * @param vm reference to VM
-   */
-  @SuppressWarnings("serial")
-  protected void closeRegion(final VM vm) {
-    SerializableRunnable closeRegion = new SerializableRunnable("Close persistent region") {
-      public void run() {
-        Cache cache = getCache();
-        Region region = cache.getRegion(REGION_NAME);
-        region.close();
-      }
-    };
-    vm.invoke(closeRegion);
+  private void putAnEntry(final VM memberVM) {
+    memberVM.invoke("putAnEntry", () -> {
+      Cache cache = managementTestRule.getCache();
+      Region region = cache.getRegion(REGION_NAME);
+      region.put("A", "B");
+    });
   }
 
-  /**
-   * Waiting to blocked waiting for another persistent member to come online
-   * 
-   * @param vm reference to VM
-   */
-  @SuppressWarnings("serial")
-  private void waitForBlockedInitialization(VM vm) {
-    vm.invoke(new SerializableRunnable() {
-
-      public void run() {
-        Wait.waitForCriterion(new WaitCriterion() {
-
-          public String description() {
-            return "Waiting to blocked waiting for another persistent member to come online";
-          }
-
-          public boolean done() {
-            Cache cache = getCache();
-            GemFireCacheImpl cacheImpl = (GemFireCacheImpl) cache;
-            PersistentMemberManager mm = cacheImpl.getPersistentMemberManager();
-            Map<String, Set<PersistentMemberID>> regions = mm.getWaitingRegions();
-            boolean done = !regions.isEmpty();
-            return done;
-          }
-
-        }, MAX_WAIT, 100, true);
-
-      }
-
+  private void closeRegion(final VM memberVM) {
+    memberVM.invoke("closeRegion", () -> {
+      Cache cache = this.managementTestRule.getCache();
+      Region region = cache.getRegion(REGION_NAME);
+      region.close();
     });
   }
 
-  /**
-   * Creates a persistent region
-   * 
-   * @param vm reference to VM
-   * @throws Throwable
-   */
-  protected void createPersistentRegion(VM vm) throws Throwable {
-    AsyncInvocation future = createPersistentRegionAsync(vm);
-    future.join(MAX_WAIT);
-    if (future.isAlive()) {
-      fail("Region not created within" + MAX_WAIT);
-    }
-    if (future.exceptionOccurred()) {
-      throw new RuntimeException(future.getException());
-    }
+  private void createPersistentRegion(final VM memberVM)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    await(createPersistentRegionAsync(memberVM));
   }
 
-  /**
-   * Creates a persistent region in async manner
-   * 
-   * @param vm reference to VM
-   * @return reference to AsyncInvocation
-   */
-  @SuppressWarnings("serial")
-  protected AsyncInvocation createPersistentRegionAsync(final VM vm) {
-    SerializableRunnable createRegion = new SerializableRunnable("Create persistent region") {
-      public void run() {
-        Cache cache = getCache();
-        DiskStoreFactory dsf = cache.createDiskStoreFactory();
-        File dir = getDiskDirForVM(vm);
-        dir.mkdirs();
-        dsf.setDiskDirs(new File[] {dir});
-        dsf.setMaxOplogSize(1);
-        dsf.setAllowForceCompaction(true);
-        dsf.setAutoCompact(false);
-        DiskStore ds = dsf.create(REGION_NAME);
-        RegionFactory rf = cache.createRegionFactory();
-        rf.setDiskStoreName(ds.getName());
-        rf.setDiskSynchronous(true);
-        rf.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
-        rf.setScope(Scope.DISTRIBUTED_ACK);
-        rf.create(REGION_NAME);
-      }
-    };
-    return vm.invokeAsync(createRegion);
+  private AsyncInvocation createPersistentRegionAsync(final VM memberVM) {
+    return memberVM.invokeAsync("createPersistentRegionAsync", () -> {
+      File dir = new File(diskDir, String.valueOf(ProcessUtils.identifyPid()));
+
+      Cache cache = this.managementTestRule.getCache();
+
+      DiskStoreFactory diskStoreFactory = cache.createDiskStoreFactory();
+      diskStoreFactory.setDiskDirs(new File[] {dir});
+      diskStoreFactory.setMaxOplogSize(1);
+      diskStoreFactory.setAllowForceCompaction(true);
+      diskStoreFactory.setAutoCompact(false);
+      DiskStore diskStore = diskStoreFactory.create(REGION_NAME);
+
+      RegionFactory regionFactory = cache.createRegionFactory();
+      regionFactory.setDiskStoreName(diskStore.getName());
+      regionFactory.setDiskSynchronous(true);
+      regionFactory.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
+      regionFactory.setScope(Scope.DISTRIBUTED_ACK);
+      regionFactory.create(REGION_NAME);
+    });
   }
 
-  /**
-   * Validates a persistent region
-   * 
-   * @param vm reference to VM
-   */
-  @SuppressWarnings("serial")
-  protected void validatePersistentRegion(final VM vm) {
-    SerializableRunnable validateDisk = new SerializableRunnable("Validate persistent region") {
-      public void run() {
-        Cache cache = getCache();
-        ManagementService service = getManagementService();
-        DiskStoreMXBean bean = service.getLocalDiskStoreMBean(REGION_NAME);
-        assertNotNull(bean);
+  private void verifyRecoveryStats(final VM memberVM, final boolean localRecovery) {
+    memberVM.invoke("verifyRecoveryStats", () -> {
+      Cache cache = this.managementTestRule.getCache();
+      Region region = cache.getRegion(REGION_NAME);
+      DistributedRegion distributedRegion = (DistributedRegion) region;
+      DiskRegionStats stats = distributedRegion.getDiskRegion().getStats();
+
+      if (localRecovery) {
+        assertThat(stats.getLocalInitializations()).isEqualTo(1);
+        assertThat(stats.getRemoteInitializations()).isEqualTo(0);
+      } else {
+        assertThat(stats.getLocalInitializations()).isEqualTo(0);
+        assertThat(stats.getRemoteInitializations()).isEqualTo(1);
       }
-    };
-    vm.invoke(validateDisk);
+    });
   }
 
-  /**
-   * Appends vm id to disk dir
-   * 
-   * @param vm reference to VM
-   * @return
-   */
-  protected File getDiskDirForVM(final VM vm) {
-    File dir = new File(diskDir, String.valueOf(vm.getPid()));
-    return dir;
+  private MemberMXBean awaitMemberMXBeanProxy(final DistributedMember member) {
+    SystemManagementService service = this.managementTestRule.getSystemManagementService();
+    ObjectName objectName = service.getMemberMBeanName(member);
+    await()
+        .until(() -> assertThat(service.getMBeanProxy(objectName, MemberMXBean.class)).isNotNull());
+    return service.getMBeanProxy(objectName, MemberMXBean.class);
   }
 
-  /**
-   * Checks recovery status
-   * 
-   * @param vm reference to VM
-   * @param localRecovery local recovery on or not
-   */
-  @SuppressWarnings("serial")
-  private void checkForRecoveryStat(VM vm, final boolean localRecovery) {
-    vm.invoke(new SerializableRunnable("check disk region stat") {
-
-      public void run() {
-        Cache cache = getCache();
-        Region region = cache.getRegion(REGION_NAME);
-        DistributedRegion distributedRegion = (DistributedRegion) region;
-        DiskRegionStats stats = distributedRegion.getDiskRegion().getStats();
-        if (localRecovery) {
-          assertEquals(1, stats.getLocalInitializations());
-          assertEquals(0, stats.getRemoteInitializations());
-        } else {
-          assertEquals(0, stats.getLocalInitializations());
-          assertEquals(1, stats.getRemoteInitializations());
-        }
-
-      }
-    });
+  private void await(final AsyncInvocation createPersistentRegionAsync)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    createPersistentRegionAsync.await(2, MINUTES);
   }
 
-  /**
-   * 
-   * @return back up directory
-   */
-  protected static File getBackupDir(String name) throws Exception {
-    File backUpDir = new File("BackupDir-" + name).getAbsoluteFile();
-    org.apache.geode.internal.FileUtil.delete(backUpDir);
-    backUpDir.mkdir();
-    backUpDir.deleteOnExit();
-    return backUpDir;
+  private ConditionFactory await() {
+    return Awaitility.await().atMost(2, MINUTES);
   }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/914cbfdc/geode-core/src/test/java/org/apache/geode/management/DistributedSystemDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/management/DistributedSystemDUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/DistributedSystemDUnitTest.java
index 4eaba67..79a4ce4 100644
--- a/geode-core/src/test/java/org/apache/geode/management/DistributedSystemDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/DistributedSystemDUnitTest.java
@@ -14,23 +14,19 @@
  */
 package org.apache.geode.management;
 
-import org.junit.experimental.categories.Category;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
-import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
-import org.apache.geode.test.junit.categories.DistributedTest;
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.geode.test.dunit.Host.*;
+import static org.apache.geode.test.dunit.Invoke.*;
+import static org.assertj.core.api.Assertions.*;
 
+import java.io.Serializable;
+import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import javax.management.InstanceNotFoundException;
 import javax.management.ListenerNotFoundException;
 import javax.management.MBeanServer;
 import javax.management.Notification;
@@ -39,11 +35,16 @@ import javax.management.NotificationFilter;
 import javax.management.NotificationListener;
 import javax.management.ObjectName;
 
+import com.jayway.awaitility.Awaitility;
+import com.jayway.awaitility.core.ConditionFactory;
 import org.apache.logging.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
-import org.apache.geode.cache.Cache;
 import org.apache.geode.distributed.DistributedMember;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.internal.admin.Alert;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.logging.LogService;
@@ -57,428 +58,277 @@ import org.apache.geode.management.internal.SystemManagementService;
 import org.apache.geode.management.internal.beans.MemberMBean;
 import org.apache.geode.management.internal.beans.SequenceNumber;
 import org.apache.geode.test.dunit.IgnoredException;
-import org.apache.geode.test.dunit.LogWriterUtils;
-import org.apache.geode.test.dunit.Host;
-import org.apache.geode.test.dunit.SerializableCallable;
-import org.apache.geode.test.dunit.SerializableRunnable;
 import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
+import org.apache.geode.test.junit.categories.DistributedTest;
 
 /**
- * Distributed System tests
- * 
- * a) For all the notifications
- * 
- * i) gemfire.distributedsystem.member.joined
- * 
- * ii) gemfire.distributedsystem.member.left
- * 
- * iii) gemfire.distributedsystem.member.suspect
- * 
- * iv ) All notifications emitted by member mbeans
- * 
- * vi) Alerts
- * 
+ * Distributed System management tests
+ * <p>
+ * a) For all the notifications i) gemfire.distributedsystem.member.joined ii)
+ * gemfire.distributedsystem.member.left iii) gemfire.distributedsystem.member.suspect iv ) All
+ * notifications emitted by member mbeans vi) Alerts
+ * <p>
  * b) Concurrently modify proxy list by removing member and accessing the distributed system MBean
- * 
+ * <p>
  * c) Aggregate Operations like shutDownAll
- * 
+ * <p>
  * d) Member level operations like fetchJVMMetrics()
- * 
+ * <p>
  * e ) Statistics
- * 
- * 
- * 
  */
 @Category(DistributedTest.class)
-public class DistributedSystemDUnitTest extends ManagementTestBase {
+@SuppressWarnings({"serial", "unused"})
+public class DistributedSystemDUnitTest implements Serializable {
 
   private static final Logger logger = LogService.getLogger();
 
-  private static final long serialVersionUID = 1L;
-
+  private static final String WARNING_LEVEL_MESSAGE = "Warning Level Alert Message";
+  private static final String SEVERE_LEVEL_MESSAGE = "Severe Level Alert Message";
 
-  private static final int MAX_WAIT = 10 * 1000;
+  private static List<Notification> notifications;
+  private static Map<ObjectName, NotificationListener> notificationListenerMap;
 
-  private static MBeanServer mbeanServer = MBeanJMXAdapter.mbeanServer;
+  @Manager
+  private VM managerVM;
 
-  static List<Notification> notifList = new ArrayList<>();
+  @Member
+  private VM[] memberVMs;
 
-  static Map<ObjectName, NotificationListener> notificationListenerMap =
-      new HashMap<ObjectName, NotificationListener>();
+  @Rule
+  public ManagementTestRule managementTestRule = ManagementTestRule.builder().build();
 
-  static final String WARNING_LEVEL_MESSAGE = "Warninglevel Alert Message";
-
-  static final String SEVERE_LEVEL_MESSAGE = "Severelevel Alert Message";
+  @Before
+  public void before() throws Exception {
+    notifications = new ArrayList<>();
+    notificationListenerMap = new HashMap<>();
 
+    invokeInEveryVM(() -> notifications = new ArrayList<>());
+    invokeInEveryVM(() -> notificationListenerMap = new HashMap<>());
+  }
 
-  public DistributedSystemDUnitTest() {
-    super();
+  @After
+  public void after() throws Exception {
+    resetAlertCounts(this.managerVM);
   }
 
   /**
    * Tests each and every operations that is defined on the MemberMXBean
-   * 
-   * @throws Exception
    */
   @Test
   public void testDistributedSystemAggregate() throws Exception {
-    VM managingNode = getManagingNode();
-    createManagementCache(managingNode);
-    startManagingNode(managingNode);
-    addNotificationListener(managingNode);
+    this.managementTestRule.createManager(this.managerVM);
+    addNotificationListener(this.managerVM);
 
-    for (VM vm : getManagedNodeList()) {
-      createCache(vm);
+    for (VM memberVM : this.memberVMs) {
+      this.managementTestRule.createMember(memberVM);
     }
 
-    checkAggregate(managingNode);
-    for (VM vm : getManagedNodeList()) {
-      closeCache(vm);
-    }
-
-    closeCache(managingNode);
-
+    verifyDistributedSystemMXBean(this.managerVM);
   }
 
   /**
    * Tests each and every operations that is defined on the MemberMXBean
-   * 
-   * @throws Exception
    */
   @Test
   public void testAlertManagedNodeFirst() throws Exception {
-
-    for (VM vm : getManagedNodeList()) {
-      createCache(vm);
-      warnLevelAlert(vm);
-      severeLevelAlert(vm);
+    for (VM memberVM : this.memberVMs) {
+      this.managementTestRule.createMember(memberVM);
+      generateWarningAlert(memberVM);
+      generateSevereAlert(memberVM);
     }
 
-    VM managingNode = getManagingNode();
-
-    createManagementCache(managingNode);
-    startManagingNode(managingNode);
-    addAlertListener(managingNode);
-    checkAlertCount(managingNode, 0, 0);
+    this.managementTestRule.createManager(this.managerVM);
+    addAlertListener(this.managerVM);
+    verifyAlertCount(this.managerVM, 0, 0);
 
-    final DistributedMember managingMember = getMember(managingNode);
+    DistributedMember managerDistributedMember =
+        this.managementTestRule.getDistributedMember(this.managerVM);
 
     // Before we start we need to ensure that the initial (implicit) SEVERE alert has propagated
     // everywhere.
-    for (VM vm : getManagedNodeList()) {
-      ensureLoggerState(vm, managingMember, Alert.SEVERE);
+    for (VM memberVM : this.memberVMs) {
+      verifyAlertAppender(memberVM, managerDistributedMember, Alert.SEVERE);
     }
 
-    setAlertLevel(managingNode, AlertDetails.getAlertLevelAsString(Alert.WARNING));
+    setAlertLevel(this.managerVM, AlertDetails.getAlertLevelAsString(Alert.WARNING));
 
-    for (VM vm : getManagedNodeList()) {
-      ensureLoggerState(vm, managingMember, Alert.WARNING);
-      warnLevelAlert(vm);
-      severeLevelAlert(vm);
+    for (VM memberVM : this.memberVMs) {
+      verifyAlertAppender(memberVM, managerDistributedMember, Alert.WARNING);
+      generateWarningAlert(memberVM);
+      generateSevereAlert(memberVM);
     }
 
-    checkAlertCount(managingNode, 3, 3);
-    resetAlertCounts(managingNode);
+    verifyAlertCount(this.managerVM, 3, 3);
+    resetAlertCounts(this.managerVM);
 
-    setAlertLevel(managingNode, AlertDetails.getAlertLevelAsString(Alert.SEVERE));
+    setAlertLevel(this.managerVM, AlertDetails.getAlertLevelAsString(Alert.SEVERE));
 
-    for (VM vm : getManagedNodeList()) {
-      ensureLoggerState(vm, managingMember, Alert.SEVERE);
-      warnLevelAlert(vm);
-      severeLevelAlert(vm);
+    for (VM memberVM : this.memberVMs) {
+      verifyAlertAppender(memberVM, managerDistributedMember, Alert.SEVERE);
+      generateWarningAlert(memberVM);
+      generateSevereAlert(memberVM);
     }
 
-    checkAlertCount(managingNode, 3, 0);
-    resetAlertCounts(managingNode);
-
-    for (VM vm : getManagedNodeList()) {
-      closeCache(vm);
-    }
-
-    closeCache(managingNode);
-  }
-
-  @SuppressWarnings("serial")
-  public void ensureLoggerState(VM vm1, final DistributedMember member, final int alertLevel)
-      throws Exception {
-    {
-      vm1.invoke(new SerializableCallable("Ensure Logger State") {
-
-        public Object call() throws Exception {
-
-          Wait.waitForCriterion(new WaitCriterion() {
-            public String description() {
-              return "Waiting for all alert Listener to register with managed node";
-            }
-
-            public boolean done() {
-
-              if (AlertAppender.getInstance().hasAlertListener(member, alertLevel)) {
-                return true;
-              }
-              return false;
-            }
-
-          }, MAX_WAIT, 500, true);
-
-          return null;
-        }
-      });
-
-    }
+    verifyAlertCount(this.managerVM, 3, 0);
   }
 
   /**
    * Tests each and every operations that is defined on the MemberMXBean
-   * 
-   * @throws Exception
    */
   @Test
   public void testShutdownAll() throws Exception {
-    final Host host = Host.getHost(0);
-    VM managedNode1 = host.getVM(0);
-    VM managedNode2 = host.getVM(1);
-    VM managedNode3 = host.getVM(2);
-
-    VM managingNode = host.getVM(3);
-
-    // Managing Node is created first
-    createManagementCache(managingNode);
-    startManagingNode(managingNode);
-
-    createCache(managedNode1);
-    createCache(managedNode2);
-    createCache(managedNode3);
-    shutDownAll(managingNode);
-    closeCache(managingNode);
+    VM memberVM1 = getHost(0).getVM(0);
+    VM memberVM2 = getHost(0).getVM(1);
+    VM memberVM3 = getHost(0).getVM(2);
+
+    VM managerVM = getHost(0).getVM(3);
+
+    // managerVM Node is created first
+    this.managementTestRule.createManager(managerVM);
+
+    this.managementTestRule.createMember(memberVM1);
+    this.managementTestRule.createMember(memberVM2);
+    this.managementTestRule.createMember(memberVM3);
+
+    shutDownAll(managerVM);
   }
 
   @Test
   public void testNavigationAPIS() throws Exception {
+    this.managementTestRule.createManager(this.managerVM);
 
-    final Host host = Host.getHost(0);
-
-    createManagementCache(managingNode);
-    startManagingNode(managingNode);
-
-    for (VM vm : managedNodeList) {
-      createCache(vm);
+    for (VM memberVM : this.memberVMs) {
+      this.managementTestRule.createMember(memberVM);
     }
 
-    checkNavigationAPIs(managingNode);
+    verifyFetchMemberObjectName(this.managerVM, this.memberVMs.length + 1);
   }
 
   @Test
   public void testNotificationHub() throws Exception {
-    this.initManagement(false);
+    this.managementTestRule.createMembers();
+    this.managementTestRule.createManagers();
 
     class NotificationHubTestListener implements NotificationListener {
+
       @Override
       public synchronized void handleNotification(Notification notification, Object handback) {
         logger.info("Notification received {}", notification);
-        notifList.add(notification);
+        notifications.add(notification);
       }
     }
 
-    managingNode.invoke(new SerializableRunnable("Add Listener to MemberMXBean") {
-
-      public void run() {
-        Cache cache = getCache();
-        ManagementService service = getManagementService();
-        final DistributedSystemMXBean bean = service.getDistributedSystemMXBean();
-
-        Wait.waitForCriterion(new WaitCriterion() {
-          public String description() {
-            return "Waiting for all members to send their initial Data";
-          }
-
-          public boolean done() {
-            if (bean.listMemberObjectNames().length == 5) {// including locator
-              return true;
-            } else {
-              return false;
-            }
-          }
-        }, MAX_WAIT, 500, true);
-        for (ObjectName objectName : bean.listMemberObjectNames()) {
-          NotificationHubTestListener listener = new NotificationHubTestListener();
-          try {
-            mbeanServer.addNotificationListener(objectName, listener, null, null);
-            notificationListenerMap.put(objectName, listener);
-          } catch (InstanceNotFoundException e) {
-            LogWriterUtils.getLogWriter().error(e);
-          }
-        }
+    this.managerVM.invoke("addListenerToMemberMXBean", () -> {
+      ManagementService service = this.managementTestRule.getManagementService();
+      final DistributedSystemMXBean distributedSystemMXBean = service.getDistributedSystemMXBean();
+
+      await().until(() -> assertThat(distributedSystemMXBean.listMemberObjectNames()).hasSize(5));
+
+      for (ObjectName objectName : distributedSystemMXBean.listMemberObjectNames()) {
+        NotificationHubTestListener listener = new NotificationHubTestListener();
+        ManagementFactory.getPlatformMBeanServer().addNotificationListener(objectName, listener,
+            null, null);
+        notificationListenerMap.put(objectName, listener);
       }
     });
 
     // Check in all VMS
 
-    for (VM vm : managedNodeList) {
-      vm.invoke(new SerializableRunnable("Check Hub Listener num count") {
-
-        public void run() {
-          Cache cache = getCache();
-          SystemManagementService service = (SystemManagementService) getManagementService();
-          NotificationHub hub = service.getNotificationHub();
-          Map<ObjectName, NotificationHubListener> listenerObjectMap = hub.getListenerObjectMap();
-          assertEquals(1, listenerObjectMap.keySet().size());
-          ObjectName memberMBeanName = MBeanJMXAdapter
-              .getMemberMBeanName(cache.getDistributedSystem().getDistributedMember());
-
-          NotificationHubListener listener = listenerObjectMap.get(memberMBeanName);
-
-          /*
-           * Counter of listener should be 2 . One for default Listener which is added for each
-           * member mbean by distributed system mbean One for the added listener in test
-           */
-          assertEquals(2, listener.getNumCounter());
-
-          // Raise some notifications
-
-          NotificationBroadcasterSupport memberLevelNotifEmitter =
-              (MemberMBean) service.getMemberMXBean();
-
-          String memberSource = MBeanJMXAdapter
-              .getMemberNameOrId(cache.getDistributedSystem().getDistributedMember());
-
-          // Only a dummy notification , no actual region is creates
-          Notification notification = new Notification(JMXNotificationType.REGION_CREATED,
-              memberSource, SequenceNumber.next(), System.currentTimeMillis(),
-              ManagementConstants.REGION_CREATED_PREFIX + "/test");
-          memberLevelNotifEmitter.sendNotification(notification);
-
-        }
+    for (VM memberVM : this.memberVMs) {
+      memberVM.invoke("checkNotificationHubListenerCount", () -> {
+        SystemManagementService service = this.managementTestRule.getSystemManagementService();
+        NotificationHub notificationHub = service.getNotificationHub();
+        Map<ObjectName, NotificationHubListener> listenerMap =
+            notificationHub.getListenerObjectMap();
+        assertThat(listenerMap.keySet()).hasSize(1);
+
+        ObjectName memberMBeanName =
+            MBeanJMXAdapter.getMemberMBeanName(this.managementTestRule.getDistributedMember());
+        NotificationHubListener listener = listenerMap.get(memberMBeanName);
+
+        /*
+         * Counter of listener should be 2 . One for default Listener which is added for each member
+         * mbean by distributed system mbean One for the added listener in test
+         */
+        assertThat(listener.getNumCounter()).isEqualTo(2);
+
+        // Raise some notifications
+
+        NotificationBroadcasterSupport notifier = (MemberMBean) service.getMemberMXBean();
+        String memberSource =
+            MBeanJMXAdapter.getMemberNameOrId(this.managementTestRule.getDistributedMember());
+
+        // Only a dummy notification , no actual region is created
+        Notification notification = new Notification(JMXNotificationType.REGION_CREATED,
+            memberSource, SequenceNumber.next(), System.currentTimeMillis(),
+            ManagementConstants.REGION_CREATED_PREFIX + "/test");
+        notifier.sendNotification(notification);
       });
     }
 
-    managingNode.invoke(new SerializableRunnable("Check notifications && Remove Listeners") {
-
-      public void run() {
-
-        Wait.waitForCriterion(new WaitCriterion() {
-          public String description() {
-            return "Waiting for all Notifications to reach the Managing Node";
-          }
-
-          public boolean done() {
-            if (notifList.size() == 3) {
-              return true;
-            } else {
-              return false;
-            }
-          }
-        }, MAX_WAIT, 500, true);
-
-        notifList.clear();
+    this.managerVM.invoke("checkNotificationsAndRemoveListeners", () -> {
+      await().until(() -> assertThat(notifications).hasSize(3));
 
-        Iterator<ObjectName> it = notificationListenerMap.keySet().iterator();
-        while (it.hasNext()) {
-          ObjectName objectName = it.next();
-          NotificationListener listener = notificationListenerMap.get(objectName);
-          try {
-            mbeanServer.removeNotificationListener(objectName, listener);
-          } catch (ListenerNotFoundException e) {
-            LogWriterUtils.getLogWriter().error(e);
-          } catch (InstanceNotFoundException e) {
-            LogWriterUtils.getLogWriter().error(e);
-          }
-        }
+      notifications.clear();
 
+      for (ObjectName objectName : notificationListenerMap.keySet()) {
+        NotificationListener listener = notificationListenerMap.get(objectName);
+        ManagementFactory.getPlatformMBeanServer().removeNotificationListener(objectName, listener);
       }
     });
 
     // Check in all VMS again
 
-    for (VM vm : managedNodeList) {
-      vm.invoke(new SerializableRunnable("Check Hub Listener num count Again") {
-
-        public void run() {
-          Cache cache = getCache();
-          SystemManagementService service = (SystemManagementService) getManagementService();
-          NotificationHub hub = service.getNotificationHub();
-          Map<ObjectName, NotificationHubListener> listenerObjectMap = hub.getListenerObjectMap();
-
-          assertEquals(1, listenerObjectMap.keySet().size());
-
-          ObjectName memberMBeanName = MBeanJMXAdapter
-              .getMemberMBeanName(cache.getDistributedSystem().getDistributedMember());
-
-          NotificationHubListener listener = listenerObjectMap.get(memberMBeanName);
-
-          /*
-           * Counter of listener should be 1 for the default Listener which is added for each member
-           * mbean by distributed system mbean.
-           */
-          assertEquals(1, listener.getNumCounter());
-
-        }
+    for (VM memberVM : this.memberVMs) {
+      memberVM.invoke("checkNotificationHubListenerCountAgain", () -> {
+        SystemManagementService service = this.managementTestRule.getSystemManagementService();
+        NotificationHub hub = service.getNotificationHub();
+        Map<ObjectName, NotificationHubListener> listenerObjectMap = hub.getListenerObjectMap();
+        assertThat(listenerObjectMap.keySet().size()).isEqualTo(1);
+
+        ObjectName memberMBeanName =
+            MBeanJMXAdapter.getMemberMBeanName(this.managementTestRule.getDistributedMember());
+        NotificationHubListener listener = listenerObjectMap.get(memberMBeanName);
+
+        /*
+         * Counter of listener should be 1 for the default Listener which is added for each member
+         * mbean by distributed system mbean.
+         */
+        assertThat(listener.getNumCounter()).isEqualTo(1);
       });
     }
 
-    managingNode.invoke(new SerializableRunnable("Remove Listener from MemberMXBean") {
-
-      public void run() {
-        Cache cache = getCache();
-        ManagementService service = getManagementService();
-        final DistributedSystemMXBean bean = service.getDistributedSystemMXBean();
-
-        Wait.waitForCriterion(new WaitCriterion() {
-          public String description() {
-            return "Waiting for all members to send their initial Data";
-          }
-
-          public boolean done() {
-            if (bean.listMemberObjectNames().length == 5) {// including locator
-              return true;
-            } else {
-              return false;
-            }
-
-          }
-
-        }, MAX_WAIT, 500, true);
-        for (ObjectName objectName : bean.listMemberObjectNames()) {
-          NotificationHubTestListener listener = new NotificationHubTestListener();
-          try {
-            mbeanServer.removeNotificationListener(objectName, listener);
-          } catch (InstanceNotFoundException e) {
-            LogWriterUtils.getLogWriter().error(e);
-          } catch (ListenerNotFoundException e) {
-            // TODO: apparently there is never a notification listener on any these mbeans at this
-            // point
-            // fix this test so it doesn't hit these unexpected exceptions --
-            // getLogWriter().error(e);
-          }
+    this.managerVM.invoke("removeListenerFromMemberMXBean", () -> {
+      ManagementService service = this.managementTestRule.getManagementService();
+      DistributedSystemMXBean distributedSystemMXBean = service.getDistributedSystemMXBean();
+
+      await().until(() -> assertThat(distributedSystemMXBean.listMemberObjectNames()).hasSize(5));
+
+      for (ObjectName objectName : distributedSystemMXBean.listMemberObjectNames()) {
+        NotificationHubTestListener listener = new NotificationHubTestListener();
+        try {
+          ManagementFactory.getPlatformMBeanServer().removeNotificationListener(objectName,
+              listener); // because new instance!!
+        } catch (ListenerNotFoundException e) {
+          // TODO: [old] apparently there is never a notification listener on any these mbeans at
+          // this point [fix this]
+          // fix this test so it doesn't hit these unexpected exceptions -- getLogWriter().error(e);
         }
       }
     });
 
-    for (VM vm : managedNodeList) {
-      vm.invoke(new SerializableRunnable("Check Hub Listeners clean up") {
-
-        public void run() {
-          Cache cache = getCache();
-          SystemManagementService service = (SystemManagementService) getManagementService();
-          NotificationHub hub = service.getNotificationHub();
-          hub.cleanUpListeners();
-          assertEquals(0, hub.getListenerObjectMap().size());
-
-          Iterator<ObjectName> it = notificationListenerMap.keySet().iterator();
-          while (it.hasNext()) {
-            ObjectName objectName = it.next();
-            NotificationListener listener = notificationListenerMap.get(objectName);
-            try {
-              mbeanServer.removeNotificationListener(objectName, listener);
-              fail("Found Listeners inspite of clearing them");
-            } catch (ListenerNotFoundException e) {
-              // Expected Exception Do nothing
-            } catch (InstanceNotFoundException e) {
-              LogWriterUtils.getLogWriter().error(e);
-            }
-          }
+    for (VM memberVM : this.memberVMs) {
+      memberVM.invoke("verifyNotificationHubListenersWereRemoved", () -> {
+        SystemManagementService service = this.managementTestRule.getSystemManagementService();
+        NotificationHub notificationHub = service.getNotificationHub();
+        notificationHub.cleanUpListeners();
+        assertThat(notificationHub.getListenerObjectMap()).isEmpty();
+
+        for (ObjectName objectName : notificationListenerMap.keySet()) {
+          NotificationListener listener = notificationListenerMap.get(objectName);
+          assertThatThrownBy(() -> ManagementFactory.getPlatformMBeanServer()
+              .removeNotificationListener(objectName, listener))
+                  .isExactlyInstanceOf(ListenerNotFoundException.class);
         }
       });
     }
@@ -486,404 +336,229 @@ public class DistributedSystemDUnitTest extends ManagementTestBase {
 
   /**
    * Tests each and every operations that is defined on the MemberMXBean
-   * 
-   * @throws Exception
    */
   @Test
   public void testAlert() throws Exception {
-    VM managingNode = getManagingNode();
-
-    createManagementCache(managingNode);
-    startManagingNode(managingNode);
-    addAlertListener(managingNode);
-    resetAlertCounts(managingNode);
-
-    final DistributedMember managingMember = getMember(managingNode);
-
-
+    this.managementTestRule.createManager(this.managerVM);
+    addAlertListener(this.managerVM);
+    resetAlertCounts(this.managerVM);
 
-    warnLevelAlert(managingNode);
-    severeLevelAlert(managingNode);
-    checkAlertCount(managingNode, 1, 0);
-    resetAlertCounts(managingNode);
+    DistributedMember managerDistributedMember =
+        this.managementTestRule.getDistributedMember(this.managerVM);
 
-    for (VM vm : getManagedNodeList()) {
+    generateWarningAlert(this.managerVM);
+    generateSevereAlert(this.managerVM);
+    verifyAlertCount(this.managerVM, 1, 0);
+    resetAlertCounts(this.managerVM);
 
-      createCache(vm);
-      // Default is severe ,So only Severe level alert is expected
+    for (VM memberVM : this.memberVMs) {
+      this.managementTestRule.createMember(memberVM);
 
-      ensureLoggerState(vm, managingMember, Alert.SEVERE);
-
-      warnLevelAlert(vm);
-      severeLevelAlert(vm);
+      verifyAlertAppender(memberVM, managerDistributedMember, Alert.SEVERE);
 
+      generateWarningAlert(memberVM);
+      generateSevereAlert(memberVM);
     }
-    checkAlertCount(managingNode, 3, 0);
-    resetAlertCounts(managingNode);
-    setAlertLevel(managingNode, AlertDetails.getAlertLevelAsString(Alert.WARNING));
 
+    verifyAlertCount(this.managerVM, 3, 0);
+    resetAlertCounts(this.managerVM);
+    setAlertLevel(this.managerVM, AlertDetails.getAlertLevelAsString(Alert.WARNING));
 
-    for (VM vm : getManagedNodeList()) {
-      // warning and severe alerts both are to be checked
-      ensureLoggerState(vm, managingMember, Alert.WARNING);
-      warnLevelAlert(vm);
-      severeLevelAlert(vm);
+    for (VM memberVM : this.memberVMs) {
+      verifyAlertAppender(memberVM, managerDistributedMember, Alert.WARNING);
+      generateWarningAlert(memberVM);
+      generateSevereAlert(memberVM);
     }
 
-    checkAlertCount(managingNode, 3, 3);
-
-    resetAlertCounts(managingNode);
+    verifyAlertCount(this.managerVM, 3, 3);
 
-    setAlertLevel(managingNode, AlertDetails.getAlertLevelAsString(Alert.OFF));
+    resetAlertCounts(this.managerVM);
 
-    for (VM vm : getManagedNodeList()) {
-      ensureLoggerState(vm, managingMember, Alert.OFF);
-      warnLevelAlert(vm);
-      severeLevelAlert(vm);
-    }
-    checkAlertCount(managingNode, 0, 0);
-    resetAlertCounts(managingNode);
+    setAlertLevel(this.managerVM, AlertDetails.getAlertLevelAsString(Alert.OFF));
 
-    for (VM vm : getManagedNodeList()) {
-      closeCache(vm);
+    for (VM memberVM : this.memberVMs) {
+      verifyAlertAppender(memberVM, managerDistributedMember, Alert.OFF);
+      generateWarningAlert(memberVM);
+      generateSevereAlert(memberVM);
     }
 
-    closeCache(managingNode);
-
+    verifyAlertCount(this.managerVM, 0, 0);
   }
 
-  @SuppressWarnings("serial")
-  public void checkAlertCount(VM vm1, final int expectedSevereAlertCount,
-      final int expectedWarningAlertCount) throws Exception {
-    {
-      vm1.invoke(new SerializableCallable("Check Alert Count") {
-
-        public Object call() throws Exception {
-          final AlertNotifListener nt = AlertNotifListener.getInstance();
-          Wait.waitForCriterion(new WaitCriterion() {
-            public String description() {
-              return "Waiting for all alerts to reach the Managing Node";
-            }
-
-            public boolean done() {
-              if (expectedSevereAlertCount == nt.getseverAlertCount()
-                  && expectedWarningAlertCount == nt.getWarnigAlertCount()) {
-                return true;
-              } else {
-                return false;
-              }
-
-            }
-
-          }, MAX_WAIT, 500, true);
-
-          return null;
-        }
-      });
-
-    }
+  private void verifyAlertAppender(final VM memberVM, final DistributedMember member,
+      final int alertLevel) {
+    memberVM.invoke("verifyAlertAppender",
+        () -> await().until(
+            () -> assertThat(AlertAppender.getInstance().hasAlertListener(member, alertLevel))
+                .isTrue()));
   }
 
+  private void verifyAlertCount(final VM managerVM, final int expectedSevereAlertCount,
+      final int expectedWarningAlertCount) {
+    managerVM.invoke("verifyAlertCount", () -> {
+      AlertNotificationListener listener = AlertNotificationListener.getInstance();
 
-
-  @SuppressWarnings("serial")
-  public void setAlertLevel(VM vm1, final String alertLevel) throws Exception {
-    {
-      vm1.invoke(new SerializableCallable("Set Alert level") {
-
-        public Object call() throws Exception {
-          GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
-          ManagementService service = getManagementService();
-          DistributedSystemMXBean bean = service.getDistributedSystemMXBean();
-          assertNotNull(bean);
-          bean.changeAlertLevel(alertLevel);
-
-          return null;
-        }
-      });
-
-    }
+      await().until(
+          () -> assertThat(listener.getSevereAlertCount()).isEqualTo(expectedSevereAlertCount));
+      await().until(
+          () -> assertThat(listener.getWarningAlertCount()).isEqualTo(expectedWarningAlertCount));
+    });
   }
 
-  @SuppressWarnings("serial")
-  public void warnLevelAlert(VM vm1) throws Exception {
-    {
-      vm1.invoke(new SerializableCallable("Warning level Alerts") {
-
-        public Object call() throws Exception {
-          final IgnoredException warnEx =
-              IgnoredException.addIgnoredException(WARNING_LEVEL_MESSAGE);
-          logger.warn(WARNING_LEVEL_MESSAGE);
-          warnEx.remove();
-          return null;
-        }
-      });
-
-    }
+  private void setAlertLevel(final VM managerVM, final String alertLevel) {
+    managerVM.invoke("setAlertLevel", () -> {
+      ManagementService service = this.managementTestRule.getManagementService();
+      DistributedSystemMXBean distributedSystemMXBean = service.getDistributedSystemMXBean();
+      distributedSystemMXBean.changeAlertLevel(alertLevel);
+    });
   }
 
-
-  @SuppressWarnings("serial")
-  public void resetAlertCounts(VM vm1) throws Exception {
-    {
-      vm1.invoke(new SerializableCallable("Reset Alert Count") {
-
-        public Object call() throws Exception {
-          AlertNotifListener nt = AlertNotifListener.getInstance();
-          nt.resetCount();
-          return null;
-        }
-      });
-
-    }
+  private void generateWarningAlert(final VM anyVM) {
+    anyVM.invoke("generateWarningAlert", () -> {
+      IgnoredException ignoredException =
+          IgnoredException.addIgnoredException(WARNING_LEVEL_MESSAGE);
+      logger.warn(WARNING_LEVEL_MESSAGE);
+      ignoredException.remove();
+    });
   }
 
-  @SuppressWarnings("serial")
-  public void severeLevelAlert(VM vm1) throws Exception {
-    {
-      vm1.invoke(new SerializableCallable("Severe Level Alert") {
-
-        public Object call() throws Exception {
-          // add expected exception strings
-
-          final IgnoredException severeEx =
-              IgnoredException.addIgnoredException(SEVERE_LEVEL_MESSAGE);
-          logger.fatal(SEVERE_LEVEL_MESSAGE);
-          severeEx.remove();
-          return null;
-        }
-      });
-
-    }
+  private void resetAlertCounts(final VM managerVM) {
+    managerVM.invoke("resetAlertCounts", () -> {
+      AlertNotificationListener listener = AlertNotificationListener.getInstance();
+      listener.resetCount();
+    });
   }
 
-  @SuppressWarnings("serial")
-  public void addAlertListener(VM vm1) throws Exception {
-    {
-      vm1.invoke(new SerializableCallable("Add Alert Listener") {
-
-        public Object call() throws Exception {
-          GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
-          ManagementService service = getManagementService();
-          DistributedSystemMXBean bean = service.getDistributedSystemMXBean();
-          AlertNotifListener nt = AlertNotifListener.getInstance();
-          nt.resetCount();
-
-          NotificationFilter notificationFilter = new NotificationFilter() {
-            @Override
-            public boolean isNotificationEnabled(Notification notification) {
-              return notification.getType().equals(JMXNotificationType.SYSTEM_ALERT);
-            }
-
-          };
+  private void generateSevereAlert(final VM anyVM) {
+    anyVM.invoke("generateSevereAlert", () -> {
+      IgnoredException ignoredException =
+          IgnoredException.addIgnoredException(SEVERE_LEVEL_MESSAGE);
+      logger.fatal(SEVERE_LEVEL_MESSAGE);
+      ignoredException.remove();
+    });
+  }
 
-          mbeanServer.addNotificationListener(MBeanJMXAdapter.getDistributedSystemName(), nt,
-              notificationFilter, null);
+  private void addAlertListener(final VM managerVM) {
+    managerVM.invoke("addAlertListener", () -> {
+      AlertNotificationListener listener = AlertNotificationListener.getInstance();
+      listener.resetCount();
 
-          return null;
-        }
-      });
+      NotificationFilter notificationFilter = (Notification notification) -> notification.getType()
+          .equals(JMXNotificationType.SYSTEM_ALERT);
 
-    }
+      ManagementFactory.getPlatformMBeanServer().addNotificationListener(
+          MBeanJMXAdapter.getDistributedSystemName(), listener, notificationFilter, null);
+    });
   }
 
   /**
    * Check aggregate related functions and attributes
-   * 
-   * @param vm1
-   * @throws Exception
    */
-  @SuppressWarnings("serial")
-  public void checkAggregate(VM vm1) throws Exception {
-    {
-      vm1.invoke(new SerializableCallable("Chech Aggregate Attributes") {
-
-        public Object call() throws Exception {
-          GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
-
-          ManagementService service = getManagementService();
-
-          final DistributedSystemMXBean bean = service.getDistributedSystemMXBean();
-          assertNotNull(service.getDistributedSystemMXBean());
-
-          Wait.waitForCriterion(new WaitCriterion() {
-            public String description() {
-              return "Waiting All members to intitialize DistributedSystemMBean expect 5 but found "
-                  + bean.getMemberCount();
-            }
-
-            public boolean done() {
-              // including locator
-              if (bean.getMemberCount() == 5) {
-                return true;
-              } else {
-                return false;
-              }
-
-            }
-
-          }, MAX_WAIT, 500, true);
-
-
+  private void verifyDistributedSystemMXBean(final VM managerVM) {
+    managerVM.invoke("verifyDistributedSystemMXBean", () -> {
+      ManagementService service = this.managementTestRule.getManagementService();
+      DistributedSystemMXBean distributedSystemMXBean = service.getDistributedSystemMXBean();
 
-          final Set<DistributedMember> otherMemberSet =
-              cache.getDistributionManager().getOtherNormalDistributionManagerIds();
-          Iterator<DistributedMember> memberIt = otherMemberSet.iterator();
-          while (memberIt.hasNext()) {
-            DistributedMember member = memberIt.next();
-            LogWriterUtils.getLogWriter().info("JVM Metrics For Member " + member.getId() + ":"
-                + bean.showJVMMetrics(member.getId()));
-            LogWriterUtils.getLogWriter().info("OS Metrics For Member " + member.getId() + ":"
-                + bean.showOSMetrics(member.getId()));
-          }
+      await().until(() -> assertThat(distributedSystemMXBean.getMemberCount()).isEqualTo(5));
 
-          return null;
-        }
-      });
-
-    }
+      Set<DistributedMember> otherMemberSet = this.managementTestRule.getOtherNormalMembers();
+      for (DistributedMember member : otherMemberSet) {
+        // TODO: need assertions? JVMMetrics and OSMetrics
+      }
+    });
   }
 
-  @SuppressWarnings("serial")
-  public void addNotificationListener(VM vm1) throws Exception {
-    {
-      vm1.invoke(new SerializableCallable("Add Notification Listener") {
-
-        public Object call() throws Exception {
-          GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
-          ManagementService service = getManagementService();
-          DistributedSystemMXBean bean = service.getDistributedSystemMXBean();
-          assertNotNull(bean);
-          TestDistributedSystemNotif nt = new TestDistributedSystemNotif();
-          mbeanServer.addNotificationListener(MBeanJMXAdapter.getDistributedSystemName(), nt, null,
-              null);
-
-          return null;
-        }
-      });
+  private void addNotificationListener(final VM managerVM) {
+    managerVM.invoke("addNotificationListener", () -> {
+      ManagementService service = this.managementTestRule.getManagementService();
+      DistributedSystemMXBean distributedSystemMXBean = service.getDistributedSystemMXBean();
+      assertThat(distributedSystemMXBean).isNotNull();
 
-    }
+      DistributedSystemNotificationListener listener = new DistributedSystemNotificationListener();
+      ManagementFactory.getPlatformMBeanServer().addNotificationListener(
+          MBeanJMXAdapter.getDistributedSystemName(), listener, null, null);
+    });
   }
 
+  private void shutDownAll(final VM managerVM) {
+    managerVM.invoke("shutDownAll", () -> {
+      ManagementService service = this.managementTestRule.getManagementService();
+      DistributedSystemMXBean distributedSystemMXBean = service.getDistributedSystemMXBean();
+      distributedSystemMXBean.shutDownAllMembers();
 
-
-  @SuppressWarnings("serial")
-  public void shutDownAll(VM vm1) throws Exception {
-    {
-      vm1.invoke(new SerializableCallable("Shut Down All") {
-
-        public Object call() throws Exception {
-          GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
-          ManagementService service = getManagementService();
-          DistributedSystemMXBean bean = service.getDistributedSystemMXBean();
-          assertNotNull(service.getDistributedSystemMXBean());
-          bean.shutDownAllMembers();
-          Wait.pause(2000);
-          assertEquals(cache.getDistributedSystem().getAllOtherMembers().size(), 1);
-          return null;
-        }
-      });
-
-    }
+      await().until(() -> assertThat(this.managementTestRule.getOtherNormalMembers()).hasSize(0));
+    });
   }
 
+  private void verifyFetchMemberObjectName(final VM managerVM, final int memberCount) {
+    managerVM.invoke("verifyFetchMemberObjectName", () -> {
+      ManagementService service = this.managementTestRule.getManagementService();
+      DistributedSystemMXBean distributedSystemMXBean = service.getDistributedSystemMXBean();
 
+      await().until(
+          () -> assertThat(distributedSystemMXBean.listMemberObjectNames()).hasSize(memberCount));
 
-  @SuppressWarnings("serial")
-  public void checkNavigationAPIs(VM vm1) throws Exception {
-    {
-      vm1.invoke(new SerializableCallable("Check Navigation APIS") {
-
-        public Object call() throws Exception {
-          GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
-          ManagementService service = getManagementService();
-          final DistributedSystemMXBean bean = service.getDistributedSystemMXBean();
-
-          assertNotNull(service.getDistributedSystemMXBean());
-
-          waitForAllMembers(4);
-
-          for (int i = 0; i < bean.listMemberObjectNames().length; i++) {
-            LogWriterUtils.getLogWriter()
-                .info("ObjectNames Of the Mmeber" + bean.listMemberObjectNames()[i]);
-          }
-
-
-          ObjectName thisMemberName = MBeanJMXAdapter.getMemberMBeanName(
-              InternalDistributedSystem.getConnectedInstance().getDistributedMember().getId());
-
-          ObjectName memberName = bean.fetchMemberObjectName(
-              InternalDistributedSystem.getConnectedInstance().getDistributedMember().getId());
-          assertEquals(thisMemberName, memberName);
-
-          return null;
-        }
-      });
-
-    }
+      String memberId = this.managementTestRule.getDistributedMember().getId();
+      ObjectName thisMemberName = MBeanJMXAdapter.getMemberMBeanName(memberId);
+      ObjectName memberName = distributedSystemMXBean.fetchMemberObjectName(memberId);
+      assertThat(memberName).isEqualTo(thisMemberName);
+    });
   }
 
+  private ConditionFactory await() {
+    return Awaitility.await().atMost(2, MINUTES);
+  }
 
-  /**
-   * Notification handler
-   * 
-   * 
-   */
-  private static class TestDistributedSystemNotif implements NotificationListener {
+  private static class DistributedSystemNotificationListener implements NotificationListener {
 
     @Override
-    public void handleNotification(Notification notification, Object handback) {
-      assertNotNull(notification);
+    public void handleNotification(final Notification notification, final Object handback) {
+      assertThat(notification).isNotNull();
     }
-
   }
 
-  /**
-   * Notification handler
-   * 
-   * 
-   */
-  private static class AlertNotifListener implements NotificationListener {
+  private static class AlertNotificationListener implements NotificationListener {
+
+    private static AlertNotificationListener listener = new AlertNotificationListener();
+
+    private int warningAlertCount = 0;
 
-    private static AlertNotifListener listener = new AlertNotifListener();
+    private int severeAlertCount = 0;
 
-    public static AlertNotifListener getInstance() {
+    static AlertNotificationListener getInstance() { // TODO: get rid of singleton
       return listener;
     }
 
-    private int warnigAlertCount = 0;
+    @Override
+    public synchronized void handleNotification(final Notification notification,
+        final Object handback) {
+      assertThat(notification).isNotNull();
 
-    private int severAlertCount = 0;
+      Map<String, String> notificationUserData = (Map<String, String>) notification.getUserData();
 
-    @Override
-    public synchronized void handleNotification(Notification notification, Object handback) {
-      assertNotNull(notification);
-      logger.info("Notification received {}", notification);
-      Map<String, String> notifUserData = (Map<String, String>) notification.getUserData();
-      if (notifUserData.get(JMXNotificationUserData.ALERT_LEVEL).equalsIgnoreCase("warning")) {
-        assertEquals(WARNING_LEVEL_MESSAGE, notification.getMessage());
-        ++warnigAlertCount;
+      if (notificationUserData.get(JMXNotificationUserData.ALERT_LEVEL)
+          .equalsIgnoreCase("warning")) {
+        assertThat(notification.getMessage()).isEqualTo(WARNING_LEVEL_MESSAGE);
+        warningAlertCount++;
       }
-      if (notifUserData.get(JMXNotificationUserData.ALERT_LEVEL).equalsIgnoreCase("severe")) {
-        assertEquals(SEVERE_LEVEL_MESSAGE, notification.getMessage());
-        ++severAlertCount;
+      if (notificationUserData.get(JMXNotificationUserData.ALERT_LEVEL)
+          .equalsIgnoreCase("severe")) {
+        assertThat(notification.getMessage()).isEqualTo(SEVERE_LEVEL_MESSAGE);
+        severeAlertCount++;
       }
     }
 
-    public void resetCount() {
-      warnigAlertCount = 0;
-
-      severAlertCount = 0;
+    void resetCount() {
+      warningAlertCount = 0;
+      severeAlertCount = 0;
     }
 
-    public int getWarnigAlertCount() {
-      return warnigAlertCount;
+    int getWarningAlertCount() {
+      return warningAlertCount;
     }
 
-    public int getseverAlertCount() {
-      return severAlertCount;
+    int getSevereAlertCount() {
+      return severeAlertCount;
     }
-
   }
-
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/914cbfdc/geode-core/src/test/java/org/apache/geode/management/JMXMBeanDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/management/JMXMBeanDUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/JMXMBeanDUnitTest.java
index 0feb4c2..0096f0d 100644
--- a/geode-core/src/test/java/org/apache/geode/management/JMXMBeanDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/JMXMBeanDUnitTest.java
@@ -49,7 +49,7 @@ import org.apache.geode.test.junit.categories.FlakyTest;
 import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
 import org.apache.geode.util.test.TestUtil;
 
-public class JMXMBeanDUnitTest extends DistributedTestCase {
+public class JMXMBeanDUnitTest extends DistributedTestCase { // TODO: rename and fix on Mac
 
   private Host host;
   private VM locator;