You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/12/12 22:36:42 UTC
[39/46] geode git commit: Convert from ManagementTestCase to
ManagementTestRule
http://git-wip-us.apache.org/repos/asf/geode/blob/c3586a96/geode-core/src/test/java/org/apache/geode/management/DiskManagementDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/management/DiskManagementDUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/DiskManagementDUnitTest.java
index bcfadde..bfeebea 100644
--- a/geode-core/src/test/java/org/apache/geode/management/DiskManagementDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/DiskManagementDUnitTest.java
@@ -14,24 +14,25 @@
*/
package org.apache.geode.management;
-import org.junit.experimental.categories.Category;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
-import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
-import org.apache.geode.test.junit.categories.DistributedTest;
+import static java.util.concurrent.TimeUnit.*;
+import static org.assertj.core.api.Assertions.*;
import java.io.File;
-import java.util.Arrays;
-import java.util.List;
+import java.io.Serializable;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
import javax.management.ObjectName;
-import org.apache.geode.LogWriter;
+import com.jayway.awaitility.Awaitility;
+import com.jayway.awaitility.core.ConditionFactory;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.DiskStore;
@@ -40,674 +41,366 @@ import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.Scope;
import org.apache.geode.distributed.DistributedMember;
-import org.apache.geode.internal.cache.DiskRegion;
import org.apache.geode.internal.cache.DiskRegionStats;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.GemFireCacheImpl;
-import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.persistence.PersistentMemberID;
import org.apache.geode.internal.cache.persistence.PersistentMemberManager;
-import org.apache.geode.management.internal.MBeanJMXAdapter;
+import org.apache.geode.internal.process.ProcessUtils;
+import org.apache.geode.management.internal.SystemManagementService;
import org.apache.geode.test.dunit.AsyncInvocation;
-import org.apache.geode.test.dunit.SerializableCallable;
-import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
/**
- * Test cases to cover all test cases which pertains to disk from Management layer
- *
- *
+ * Test cases to cover all test cases which pertains to disk from Management
+ * layer
*/
@Category(DistributedTest.class)
-public class DiskManagementDUnitTest extends ManagementTestBase {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- // This must be bigger than the dunit ack-wait-threshold for the revoke
- // tests. The command line is setting the ack-wait-threshold to be
- // 60 seconds.
- private static final int MAX_WAIT = 70 * 1000;
-
- boolean testFailed = false;
+@SuppressWarnings({ "serial", "unused" })
+public class DiskManagementDUnitTest implements Serializable {
- String failureCause = "";
- static final String REGION_NAME = "region";
+ private static final String REGION_NAME = DiskManagementDUnitTest.class.getSimpleName() + "_region";
private File diskDir;
- protected static LogWriter logWriter;
+ @Manager
+ private VM managerVM;
- public DiskManagementDUnitTest() throws Exception {
- super();
+ @Member
+ private VM[] memberVMs;
- diskDir = new File("diskDir-" + getName()).getAbsoluteFile();
- org.apache.geode.internal.FileUtil.delete(diskDir);
- diskDir.mkdir();
- diskDir.deleteOnExit();
- }
+ @Rule
+ public ManagementTestRule managementTestRule = ManagementTestRule.builder().start(true).build();
- @Override
- protected final void postSetUpManagementTestBase() throws Exception {
- failureCause = "";
- testFailed = false;
- }
+ @Rule
+ public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder();
- @Override
- protected final void postTearDownManagementTestBase() throws Exception {
- org.apache.geode.internal.FileUtil.delete(diskDir);
+ @Before
+ public void before() throws Exception {
+ this.diskDir = this.temporaryFolder.newFolder("diskDir");
}
/**
- * Tests Disk Compaction from a MemberMbean which is at cache level. All the disks which belong to
- * the cache should be compacted.
- *
- * @throws Exception
+ * Tests Disk Compaction from a MemberMXBean which is at cache level. All the
+ * disks which belong to the cache should be compacted.
*/
-
@Test
- public void testDiskCompact() throws Throwable {
- initManagement(false);
- for (VM vm : getManagedNodeList()) {
- createPersistentRegion(vm);
- makeDiskCompactable(vm);
+ public void testDiskCompact() throws Exception {
+ for (VM memberVM : this.memberVMs) {
+ createPersistentRegion(memberVM);
+ makeDiskCompactable(memberVM);
}
- for (VM vm : getManagedNodeList()) {
- compactAllDiskStores(vm);
+ for (VM memberVM : this.memberVMs) {
+ compactAllDiskStores(memberVM);
}
-
}
/**
- * Tests Disk Compaction from a MemberMbean which is at cache level. All the disks which belong to
- * the cache should be compacted.
- *
- * @throws Exception
+ * Tests Disk Compaction from a MemberMXBean which is at cache level. All the
+ * disks which belong to the cache should be compacted.
*/
-
@Test
- public void testDiskCompactRemote() throws Throwable {
-
- initManagement(false);
- for (VM vm : getManagedNodeList()) {
- createPersistentRegion(vm);
- makeDiskCompactable(vm);
+ public void testDiskCompactRemote() throws Exception {
+ for (VM memberVM : this.memberVMs) {
+ createPersistentRegion(memberVM);
+ makeDiskCompactable(memberVM);
}
- compactDiskStoresRemote(managingNode);
+ compactDiskStoresRemote(this.managerVM, this.memberVMs.length);
}
/**
* Tests various operations defined on DiskStore Mbean
- *
- * @throws Exception
*/
-
@Test
- public void testDiskOps() throws Throwable {
-
- initManagement(false);
- for (VM vm : getManagedNodeList()) {
- createPersistentRegion(vm);
- makeDiskCompactable(vm);
- invokeFlush(vm);
- invokeForceRoll(vm);
- invokeForceCompaction(vm);
+ public void testDiskOps() throws Exception {
+ for (VM memberVM : this.memberVMs) {
+ createPersistentRegion(memberVM);
+ makeDiskCompactable(memberVM);
+ invokeFlush(memberVM);
+ invokeForceRoll(memberVM);
+ invokeForceCompaction(memberVM);
}
-
}
@Test
- public void testDiskBackupAllMembers() throws Throwable {
- initManagement(false);
- for (VM vm : getManagedNodeList()) {
- createPersistentRegion(vm);
- makeDiskCompactable(vm);
-
+ public void testDiskBackupAllMembers() throws Exception {
+ for (VM memberVM : this.memberVMs) {
+ createPersistentRegion(memberVM);
+ makeDiskCompactable(memberVM);
}
- backupAllMembers(managingNode);
+
+ backupAllMembers(this.managerVM, this.memberVMs.length);
}
/**
- * Checks the test case of missing disks and revoking them through MemberMbean interfaces
- *
- * @throws Throwable
+ * Checks the test case of missing disks and revoking them through MemberMXBean
+ * interfaces
*/
- @SuppressWarnings("serial")
@Test
- public void testMissingMembers() throws Throwable {
+ public void testMissingMembers() throws Exception {
+ VM memberVM1 = this.memberVMs[0];
+ VM memberVM2 = this.memberVMs[1];
- initManagement(false);
- VM vm0 = getManagedNodeList().get(0);
- VM vm1 = getManagedNodeList().get(1);
- VM vm2 = getManagedNodeList().get(2);
+ createPersistentRegion(memberVM1);
+ createPersistentRegion(memberVM2);
- org.apache.geode.test.dunit.LogWriterUtils.getLogWriter().info("Creating region in VM0");
- createPersistentRegion(vm0);
- org.apache.geode.test.dunit.LogWriterUtils.getLogWriter().info("Creating region in VM1");
- createPersistentRegion(vm1);
+ putAnEntry(memberVM1);
- putAnEntry(vm0);
+ this.managerVM.invoke("checkForMissingDiskStores", () -> {
+ ManagementService service = this.managementTestRule.getManagementService();
+ DistributedSystemMXBean distributedSystemMXBean = service.getDistributedSystemMXBean();
+ PersistentMemberDetails[] missingDiskStores = distributedSystemMXBean.listMissingDiskStores();
+ assertThat(missingDiskStores).isNull();
+ });
- managingNode.invoke(new SerializableRunnable("Check for waiting regions") {
+ closeRegion(memberVM1);
- public void run() {
- Cache cache = getCache();
- ManagementService service = getManagementService();
- DistributedSystemMXBean bean = service.getDistributedSystemMXBean();
- PersistentMemberDetails[] missingDiskStores = bean.listMissingDiskStores();
+ updateTheEntry(memberVM2, "C");
- assertNull(missingDiskStores);
- }
- });
+ closeRegion(memberVM2);
- org.apache.geode.test.dunit.LogWriterUtils.getLogWriter().info("closing region in vm0");
- closeRegion(vm0);
-
- updateTheEntry(vm1);
-
- org.apache.geode.test.dunit.LogWriterUtils.getLogWriter().info("closing region in vm1");
- closeRegion(vm1);
- AsyncInvocation future = createPersistentRegionAsync(vm0);
- waitForBlockedInitialization(vm0);
- assertTrue(future.isAlive());
-
- managingNode.invoke(new SerializableRunnable("Revoke the member") {
-
- public void run() {
- Cache cache = getCache();
- GemFireCacheImpl cacheImpl = (GemFireCacheImpl) cache;
- ManagementService service = getManagementService();
- DistributedSystemMXBean bean = service.getDistributedSystemMXBean();
- PersistentMemberDetails[] missingDiskStores = bean.listMissingDiskStores();
- org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()
- .info("waiting members=" + missingDiskStores);
- assertNotNull(missingDiskStores);
- assertEquals(1, missingDiskStores.length);
-
- for (PersistentMemberDetails id : missingDiskStores) {
- org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()
- .info("Missing DiskStoreID is =" + id.getDiskStoreId());
- org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()
- .info("Missing Host is =" + id.getHost());
- org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()
- .info("Missing Directory is =" + id.getDirectory());
-
- try {
- bean.revokeMissingDiskStores(id.getDiskStoreId());
- } catch (Exception e) {
- fail("revokeMissingDiskStores failed with exception " + e);
- }
- }
- }
- });
+ AsyncInvocation creatingPersistentRegionAsync = createPersistentRegionAsync(memberVM1);
- future.join(MAX_WAIT);
- if (future.isAlive()) {
- fail("Region not created within" + MAX_WAIT);
- }
- if (future.exceptionOccurred()) {
- throw new Exception(future.getException());
- }
- checkForRecoveryStat(vm0, true);
- // Check to make sure we recovered the old
- // value of the entry.
- SerializableRunnable checkForEntry = new SerializableRunnable("check for the entry") {
-
- public void run() {
- Cache cache = getCache();
- Region region = cache.getRegion(REGION_NAME);
- assertEquals("B", region.get("A"));
- }
- };
- vm0.invoke(checkForEntry);
+ memberVM1.invoke(() ->
+ await().until(() -> {
+ GemFireCacheImpl cache = (GemFireCacheImpl) this.managementTestRule.getCache();
+ PersistentMemberManager persistentMemberManager = cache.getPersistentMemberManager();
+ Map<String, Set<PersistentMemberID>> regions = persistentMemberManager.getWaitingRegions();
+ return !regions.isEmpty();
+ })
+ );
- }
+ assertThat(creatingPersistentRegionAsync.isAlive()).isTrue();
- protected void checkNavigation(final VM vm, final DistributedMember diskMember,
- final String diskStoreName) {
- SerializableRunnable checkNavigation = new SerializableRunnable("Check Navigation") {
- public void run() {
+ this.managerVM.invoke("revokeMissingDiskStore", () -> {
+ ManagementService service = this.managementTestRule.getManagementService();
+ DistributedSystemMXBean bean = service.getDistributedSystemMXBean();
+ PersistentMemberDetails[] missingDiskStores = bean.listMissingDiskStores();
- final ManagementService service = getManagementService();
+ assertThat(missingDiskStores).isNotNull().hasSize(1);
- DistributedSystemMXBean disMBean = service.getDistributedSystemMXBean();
- try {
- ObjectName expected =
- MBeanJMXAdapter.getDiskStoreMBeanName(diskMember.getId(), diskStoreName);
- ObjectName actual = disMBean.fetchDiskStoreObjectName(diskMember.getId(), diskStoreName);
- assertEquals(expected, actual);
- } catch (Exception e) {
- fail("Disk Store Navigation Failed " + e);
- }
+ assertThat(bean.revokeMissingDiskStores(missingDiskStores[0].getDiskStoreId())).isTrue();
+ });
+ await(creatingPersistentRegionAsync);
- }
- };
- vm.invoke(checkNavigation);
- }
+ verifyRecoveryStats(memberVM1, true);
- /**
- * get Distributed member for a given vm
- */
- @SuppressWarnings("serial")
- protected static DistributedMember getMember() throws Exception {
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
- return cache.getDistributedSystem().getDistributedMember();
+ // Check to make sure we recovered the old value of the entry.
+ memberVM1.invoke("check for the entry", () -> {
+ Cache cache = this.managementTestRule.getCache();
+ Region region = cache.getRegion(REGION_NAME);
+ assertThat(region.get("A")).isEqualTo("B");
+ });
}
/**
* Invokes flush on the given disk store by MBean interface
- *
- * @param vm reference to VM
*/
- @SuppressWarnings("serial")
- public void invokeFlush(final VM vm) {
- SerializableRunnable invokeFlush = new SerializableRunnable("Invoke Flush On Disk") {
- public void run() {
- Cache cache = getCache();
- DiskStoreFactory dsf = cache.createDiskStoreFactory();
- String name = "testFlush_" + vm.getPid();
- DiskStore ds = dsf.create(name);
-
- ManagementService service = getManagementService();
- DiskStoreMXBean bean = service.getLocalDiskStoreMBean(name);
- assertNotNull(bean);
- bean.flush();
- }
- };
- vm.invoke(invokeFlush);
+ private void invokeFlush(final VM memberVM) {
+ memberVM.invoke("invokeFlush", () -> {
+ Cache cache = this.managementTestRule.getCache();
+ DiskStoreFactory diskStoreFactory = cache.createDiskStoreFactory();
+ String name = "testFlush_" + ProcessUtils.identifyPid();
+ DiskStore diskStore = diskStoreFactory.create(name);
+
+ ManagementService service = this.managementTestRule.getManagementService();
+ DiskStoreMXBean diskStoreMXBean = service.getLocalDiskStoreMBean(name);
+ assertThat(diskStoreMXBean).isNotNull();
+ assertThat(diskStoreMXBean.getName()).isEqualTo(diskStore.getName());
+
+ diskStoreMXBean.flush();
+ });
}
/**
* Invokes force roll on disk store by MBean interface
- *
- * @param vm reference to VM
*/
- @SuppressWarnings("serial")
- public void invokeForceRoll(final VM vm) {
- SerializableRunnable invokeForceRoll = new SerializableRunnable("Invoke Force Roll") {
- public void run() {
- Cache cache = getCache();
- DiskStoreFactory dsf = cache.createDiskStoreFactory();
- String name = "testForceRoll_" + vm.getPid();
- DiskStore ds = dsf.create(name);
- ManagementService service = getManagementService();
- DiskStoreMXBean bean = service.getLocalDiskStoreMBean(name);
- assertNotNull(bean);
- bean.forceRoll();
- }
- };
- vm.invoke(invokeForceRoll);
+ private void invokeForceRoll(final VM memberVM) {
+ memberVM.invoke("invokeForceRoll", () -> {
+ Cache cache = this.managementTestRule.getCache();
+ DiskStoreFactory diskStoreFactory = cache.createDiskStoreFactory();
+ String name = "testForceRoll_" + ProcessUtils.identifyPid();
+ DiskStore diskStore = diskStoreFactory.create(name);
+
+ ManagementService service = this.managementTestRule.getManagementService();
+ DiskStoreMXBean diskStoreMXBean = service.getLocalDiskStoreMBean(name);
+ assertThat(diskStoreMXBean).isNotNull();
+ assertThat(diskStoreMXBean.getName()).isEqualTo(diskStore.getName());
+
+ diskStoreMXBean.forceRoll();
+ });
}
/**
* Invokes force compaction on disk store by MBean interface
- *
- * @param vm reference to VM
*/
- @SuppressWarnings("serial")
- public void invokeForceCompaction(final VM vm) {
- SerializableRunnable invokeForceCompaction =
- new SerializableRunnable("Invoke Force Compaction") {
- public void run() {
- Cache cache = getCache();
- DiskStoreFactory dsf = cache.createDiskStoreFactory();
- dsf.setAllowForceCompaction(true);
- String name = "testForceCompaction_" + vm.getPid();
- DiskStore ds = dsf.create(name);
- ManagementService service = getManagementService();
- DiskStoreMXBean bean = service.getLocalDiskStoreMBean(name);
- assertNotNull(bean);
- assertEquals(false, bean.forceCompaction());
- }
- };
- vm.invoke(invokeForceCompaction);
+ private void invokeForceCompaction(final VM memberVM) {
+ memberVM.invoke("invokeForceCompaction", () -> {
+ Cache cache = this.managementTestRule.getCache();
+ DiskStoreFactory dsf = cache.createDiskStoreFactory();
+ dsf.setAllowForceCompaction(true);
+ String name = "testForceCompaction_" + ProcessUtils.identifyPid();
+ DiskStore diskStore = dsf.create(name);
+
+ ManagementService service = this.managementTestRule.getManagementService();
+ DiskStoreMXBean diskStoreMXBean = service.getLocalDiskStoreMBean(name);
+ assertThat(diskStoreMXBean).isNotNull();
+ assertThat(diskStoreMXBean.getName()).isEqualTo(diskStore.getName());
+
+ assertThat(diskStoreMXBean.forceCompaction()).isFalse();
+ });
}
/**
* Makes the disk compactable by adding and deleting some entries
- *
- * @throws Exception
*/
- @SuppressWarnings("serial")
- public void makeDiskCompactable(VM vm1) throws Exception {
- vm1.invoke(new SerializableRunnable("Make The Disk Compactable") {
-
- public void run() {
- Cache cache = getCache();
- Region region = cache.getRegion(REGION_NAME);
- DiskRegion dr = ((LocalRegion) region).getDiskRegion();
- org.apache.geode.test.dunit.LogWriterUtils.getLogWriter().info("putting key1");
- region.put("key1", "value1");
- org.apache.geode.test.dunit.LogWriterUtils.getLogWriter().info("putting key2");
- region.put("key2", "value2");
- org.apache.geode.test.dunit.LogWriterUtils.getLogWriter().info("removing key2");
- region.remove("key2");
- // now that it is compactable the following forceCompaction should
- // go ahead and do a roll and compact it.
- }
+ private void makeDiskCompactable(final VM memberVM) throws Exception {
+ memberVM.invoke("makeDiskCompactable", () -> {
+ Cache cache = this.managementTestRule.getCache();
+ Region region = cache.getRegion(REGION_NAME);
+ region.put("key1", "value1");
+ region.put("key2", "value2");
+ region.remove("key2");
+ // now that it is compactable the following forceCompaction should
+ // go ahead and do a roll and compact it.
});
-
}
-
-
/**
* Compacts all DiskStores belonging to a member
- *
- * @param vm1 reference to VM
- * @throws Exception
*/
- @SuppressWarnings("serial")
- public void compactAllDiskStores(VM vm1) throws Exception {
-
- vm1.invoke(new SerializableCallable("Compact All Disk Stores") {
-
- public Object call() throws Exception {
- ManagementService service = getManagementService();
- MemberMXBean memberBean = service.getMemberMXBean();
- String[] compactedDiskStores = memberBean.compactAllDiskStores();
-
- assertTrue(compactedDiskStores.length > 0);
- for (int i = 0; i < compactedDiskStores.length; i++) {
- org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()
- .info("<ExpectedString> Compacted Store " + i + " " + compactedDiskStores[i]
- + "</ExpectedString> ");
- }
-
- return null;
- }
+ private void compactAllDiskStores(final VM memberVM) throws Exception {
+ memberVM.invoke("compactAllDiskStores", () -> {
+ ManagementService service = this.managementTestRule.getManagementService();
+ MemberMXBean memberMXBean = service.getMemberMXBean();
+ String[] compactedDiskStores = memberMXBean.compactAllDiskStores();
+ assertThat(compactedDiskStores).hasSize(1);
});
-
}
/**
* Takes a back up of all the disk store in a given directory
*/
- @SuppressWarnings("serial")
- public void backupAllMembers(final VM managingVM) throws Exception {
+ private void backupAllMembers(final VM managerVM, final int memberCount) {
+ managerVM.invoke("backupAllMembers", () -> {
+ ManagementService service = this.managementTestRule.getManagementService();
+ DistributedSystemMXBean bean = service.getDistributedSystemMXBean();
+ File backupDir = this.temporaryFolder.newFolder("backupDir");
- managingVM.invoke(new SerializableCallable("Backup All Disk Stores") {
+ DiskBackupStatus status = bean.backupAllMembers(backupDir.getAbsolutePath(), null);
- public Object call() throws Exception {
- ManagementService service = getManagementService();
- DistributedSystemMXBean bean = service.getDistributedSystemMXBean();
- DiskBackupStatus status =
- bean.backupAllMembers(getBackupDir("test_backupAllMembers").getAbsolutePath(), null);
-
- return null;
- }
+ assertThat(status.getBackedUpDiskStores().keySet().size()).isEqualTo(memberCount);
+ assertThat(status.getOfflineDiskStores()).isEqualTo(null); // TODO: fix GEODE-1946
});
-
}
/**
- * Compact a disk store from Managing node
+ * Compact a disk store from managerVM VM
*/
- @SuppressWarnings("serial")
- public void compactDiskStoresRemote(VM managingVM) throws Exception {
- {
-
- managingVM.invoke(new SerializableCallable("Compact All Disk Stores Remote") {
-
- public Object call() throws Exception {
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
- Set<DistributedMember> otherMemberSet =
- cache.getDistributionManager().getOtherNormalDistributionManagerIds();
-
- for (DistributedMember member : otherMemberSet) {
- MemberMXBean bean = MBeanUtil.getMemberMbeanProxy(member);
- String[] allDisks = bean.listDiskStores(true);
- assertNotNull(allDisks);
- List<String> listString = Arrays.asList(allDisks);
- org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()
- .info("<ExpectedString> Remote All Disk Stores Are " + listString.toString()
- + "</ExpectedString> ");
- String[] compactedDiskStores = bean.compactAllDiskStores();
- assertTrue(compactedDiskStores.length > 0);
- for (int i = 0; i < compactedDiskStores.length; i++) {
- org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()
- .info("<ExpectedString> Remote Compacted Store " + i + " "
- + compactedDiskStores[i] + "</ExpectedString> ");
- }
-
- }
- return null;
- }
- });
+ private void compactDiskStoresRemote(final VM managerVM, final int memberCount) {
+ managerVM.invoke("compactDiskStoresRemote", () -> {
+ Set<DistributedMember> otherMemberSet = this.managementTestRule.getOtherNormalMembers();// ((GemFireCacheImpl)cache).getDistributionManager().getOtherNormalDistributionManagerIds();
+ assertThat(otherMemberSet.size()).isEqualTo(memberCount);
- }
+ SystemManagementService service = this.managementTestRule.getSystemManagementService();
- }
-
- /**
- * Checks if a file with the given extension is present
- *
- * @param fileExtension file extension
- * @throws Exception
- */
- protected void checkIfContainsFileWithExt(String fileExtension) throws Exception {
- File[] files = diskDir.listFiles();
- for (int j = 0; j < files.length; j++) {
- if (files[j].getAbsolutePath().endsWith(fileExtension)) {
- fail("file \"" + files[j].getAbsolutePath() + "\" still exists");
- }
- }
+ for (DistributedMember member : otherMemberSet) {
+ MemberMXBean memberMXBean = awaitMemberMXBeanProxy(member);
- }
+ String[] allDisks = memberMXBean.listDiskStores(true);
+ assertThat(allDisks).isNotNull().hasSize(1);
- /**
- * Update Entry
- *
- * @param vm1 reference to VM
- */
- protected void updateTheEntry(VM vm1) {
- updateTheEntry(vm1, "C");
- }
-
- /**
- * Update an Entry
- *
- * @param vm1 reference to VM
- * @param value Value which is updated
- */
- @SuppressWarnings("serial")
- protected void updateTheEntry(VM vm1, final String value) {
- vm1.invoke(new SerializableRunnable("change the entry") {
-
- public void run() {
- Cache cache = getCache();
- Region region = cache.getRegion(REGION_NAME);
- region.put("A", value);
+ String[] compactedDiskStores = memberMXBean.compactAllDiskStores();
+ assertThat(compactedDiskStores).hasSize(1);
}
});
}
- /**
- * Put an entry to region
- *
- * @param vm0 reference to VM
- */
- @SuppressWarnings("serial")
- protected void putAnEntry(VM vm0) {
- vm0.invoke(new SerializableRunnable("Put an entry") {
-
- public void run() {
- Cache cache = getCache();
- Region region = cache.getRegion(REGION_NAME);
- region.put("A", "B");
- }
+ private void updateTheEntry(final VM memberVM, final String value) {
+ memberVM.invoke("updateTheEntry", () -> {
+ Cache cache = this.managementTestRule.getCache();
+ Region region = cache.getRegion(REGION_NAME);
+ region.put("A", value);
});
}
- /**
- * Close the given region REGION_NAME
- *
- * @param vm reference to VM
- */
- @SuppressWarnings("serial")
- protected void closeRegion(final VM vm) {
- SerializableRunnable closeRegion = new SerializableRunnable("Close persistent region") {
- public void run() {
- Cache cache = getCache();
- Region region = cache.getRegion(REGION_NAME);
- region.close();
- }
- };
- vm.invoke(closeRegion);
+ private void putAnEntry(final VM memberVM) {
+ memberVM.invoke("putAnEntry", () -> {
+ Cache cache = managementTestRule.getCache();
+ Region region = cache.getRegion(REGION_NAME);
+ region.put("A", "B");
+ });
}
- /**
- * Waiting to blocked waiting for another persistent member to come online
- *
- * @param vm reference to VM
- */
- @SuppressWarnings("serial")
- private void waitForBlockedInitialization(VM vm) {
- vm.invoke(new SerializableRunnable() {
-
- public void run() {
- Wait.waitForCriterion(new WaitCriterion() {
-
- public String description() {
- return "Waiting to blocked waiting for another persistent member to come online";
- }
-
- public boolean done() {
- Cache cache = getCache();
- GemFireCacheImpl cacheImpl = (GemFireCacheImpl) cache;
- PersistentMemberManager mm = cacheImpl.getPersistentMemberManager();
- Map<String, Set<PersistentMemberID>> regions = mm.getWaitingRegions();
- boolean done = !regions.isEmpty();
- return done;
- }
-
- }, MAX_WAIT, 100, true);
-
- }
-
+ private void closeRegion(final VM memberVM) {
+ memberVM.invoke("closeRegion", () -> {
+ Cache cache = this.managementTestRule.getCache();
+ Region region = cache.getRegion(REGION_NAME);
+ region.close();
});
}
- /**
- * Creates a persistent region
- *
- * @param vm reference to VM
- * @throws Throwable
- */
- protected void createPersistentRegion(VM vm) throws Throwable {
- AsyncInvocation future = createPersistentRegionAsync(vm);
- future.join(MAX_WAIT);
- if (future.isAlive()) {
- fail("Region not created within" + MAX_WAIT);
- }
- if (future.exceptionOccurred()) {
- throw new RuntimeException(future.getException());
- }
+ private void createPersistentRegion(final VM memberVM) throws InterruptedException, ExecutionException, TimeoutException {
+ await(createPersistentRegionAsync(memberVM));
}
- /**
- * Creates a persistent region in async manner
- *
- * @param vm reference to VM
- * @return reference to AsyncInvocation
- */
- @SuppressWarnings("serial")
- protected AsyncInvocation createPersistentRegionAsync(final VM vm) {
- SerializableRunnable createRegion = new SerializableRunnable("Create persistent region") {
- public void run() {
- Cache cache = getCache();
- DiskStoreFactory dsf = cache.createDiskStoreFactory();
- File dir = getDiskDirForVM(vm);
- dir.mkdirs();
- dsf.setDiskDirs(new File[] {dir});
- dsf.setMaxOplogSize(1);
- dsf.setAllowForceCompaction(true);
- dsf.setAutoCompact(false);
- DiskStore ds = dsf.create(REGION_NAME);
- RegionFactory rf = cache.createRegionFactory();
- rf.setDiskStoreName(ds.getName());
- rf.setDiskSynchronous(true);
- rf.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
- rf.setScope(Scope.DISTRIBUTED_ACK);
- rf.create(REGION_NAME);
- }
- };
- return vm.invokeAsync(createRegion);
+ private AsyncInvocation createPersistentRegionAsync(final VM memberVM) {
+ return memberVM.invokeAsync("createPersistentRegionAsync", () -> {
+ File dir = new File(diskDir, String.valueOf(ProcessUtils.identifyPid()));
+
+ Cache cache = this.managementTestRule.getCache();
+
+ DiskStoreFactory diskStoreFactory = cache.createDiskStoreFactory();
+ diskStoreFactory.setDiskDirs(new File[] { dir });
+ diskStoreFactory.setMaxOplogSize(1);
+ diskStoreFactory.setAllowForceCompaction(true);
+ diskStoreFactory.setAutoCompact(false);
+ DiskStore diskStore = diskStoreFactory.create(REGION_NAME);
+
+ RegionFactory regionFactory = cache.createRegionFactory();
+ regionFactory.setDiskStoreName(diskStore.getName());
+ regionFactory.setDiskSynchronous(true);
+ regionFactory.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
+ regionFactory.setScope(Scope.DISTRIBUTED_ACK);
+ regionFactory.create(REGION_NAME);
+ });
}
- /**
- * Validates a persistent region
- *
- * @param vm reference to VM
- */
- @SuppressWarnings("serial")
- protected void validatePersistentRegion(final VM vm) {
- SerializableRunnable validateDisk = new SerializableRunnable("Validate persistent region") {
- public void run() {
- Cache cache = getCache();
- ManagementService service = getManagementService();
- DiskStoreMXBean bean = service.getLocalDiskStoreMBean(REGION_NAME);
- assertNotNull(bean);
+ private void verifyRecoveryStats(final VM memberVM, final boolean localRecovery) {
+ memberVM.invoke("verifyRecoveryStats", () -> {
+ Cache cache = this.managementTestRule.getCache();
+ Region region = cache.getRegion(REGION_NAME);
+ DistributedRegion distributedRegion = (DistributedRegion) region;
+ DiskRegionStats stats = distributedRegion.getDiskRegion().getStats();
+
+ if (localRecovery) {
+ assertThat(stats.getLocalInitializations()).isEqualTo(1);
+ assertThat(stats.getRemoteInitializations()).isEqualTo(0);
+ } else {
+ assertThat(stats.getLocalInitializations()).isEqualTo(0);
+ assertThat(stats.getRemoteInitializations()).isEqualTo(1);
}
- };
- vm.invoke(validateDisk);
+ });
}
- /**
- * Appends vm id to disk dir
- *
- * @param vm reference to VM
- * @return
- */
- protected File getDiskDirForVM(final VM vm) {
- File dir = new File(diskDir, String.valueOf(vm.getPid()));
- return dir;
+ private MemberMXBean awaitMemberMXBeanProxy(final DistributedMember member) {
+ SystemManagementService service = this.managementTestRule.getSystemManagementService();
+ ObjectName objectName = service.getMemberMBeanName(member);
+ await().until(() -> assertThat(service.getMBeanProxy(objectName, MemberMXBean.class)).isNotNull());
+ return service.getMBeanProxy(objectName, MemberMXBean.class);
}
- /**
- * Checks recovery status
- *
- * @param vm reference to VM
- * @param localRecovery local recovery on or not
- */
- @SuppressWarnings("serial")
- private void checkForRecoveryStat(VM vm, final boolean localRecovery) {
- vm.invoke(new SerializableRunnable("check disk region stat") {
-
- public void run() {
- Cache cache = getCache();
- Region region = cache.getRegion(REGION_NAME);
- DistributedRegion distributedRegion = (DistributedRegion) region;
- DiskRegionStats stats = distributedRegion.getDiskRegion().getStats();
- if (localRecovery) {
- assertEquals(1, stats.getLocalInitializations());
- assertEquals(0, stats.getRemoteInitializations());
- } else {
- assertEquals(0, stats.getLocalInitializations());
- assertEquals(1, stats.getRemoteInitializations());
- }
-
- }
- });
+ private void await(final AsyncInvocation createPersistentRegionAsync) throws InterruptedException, ExecutionException, TimeoutException {
+ createPersistentRegionAsync.await(2, MINUTES);
}
- /**
- *
- * @return back up directory
- */
- protected static File getBackupDir(String name) throws Exception {
- File backUpDir = new File("BackupDir-" + name).getAbsoluteFile();
- org.apache.geode.internal.FileUtil.delete(backUpDir);
- backUpDir.mkdir();
- backUpDir.deleteOnExit();
- return backUpDir;
+ private ConditionFactory await() {
+ return Awaitility.await().atMost(2, MINUTES);
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/c3586a96/geode-core/src/test/java/org/apache/geode/management/DistributedSystemDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/management/DistributedSystemDUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/DistributedSystemDUnitTest.java
index 4eaba67..cd05cde 100644
--- a/geode-core/src/test/java/org/apache/geode/management/DistributedSystemDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/DistributedSystemDUnitTest.java
@@ -14,23 +14,19 @@
*/
package org.apache.geode.management;
-import org.junit.experimental.categories.Category;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
-import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
-import org.apache.geode.test.junit.categories.DistributedTest;
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.geode.test.dunit.Host.*;
+import static org.apache.geode.test.dunit.Invoke.*;
+import static org.assertj.core.api.Assertions.*;
+import java.io.Serializable;
+import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import javax.management.InstanceNotFoundException;
import javax.management.ListenerNotFoundException;
import javax.management.MBeanServer;
import javax.management.Notification;
@@ -39,11 +35,16 @@ import javax.management.NotificationFilter;
import javax.management.NotificationListener;
import javax.management.ObjectName;
+import com.jayway.awaitility.Awaitility;
+import com.jayway.awaitility.core.ConditionFactory;
import org.apache.logging.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
-import org.apache.geode.cache.Cache;
import org.apache.geode.distributed.DistributedMember;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.admin.Alert;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.logging.LogService;
@@ -57,428 +58,269 @@ import org.apache.geode.management.internal.SystemManagementService;
import org.apache.geode.management.internal.beans.MemberMBean;
import org.apache.geode.management.internal.beans.SequenceNumber;
import org.apache.geode.test.dunit.IgnoredException;
-import org.apache.geode.test.dunit.LogWriterUtils;
-import org.apache.geode.test.dunit.Host;
-import org.apache.geode.test.dunit.SerializableCallable;
-import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
+import org.apache.geode.test.junit.categories.DistributedTest;
/**
- * Distributed System tests
- *
+ * Distributed System management tests
+ * <p>
* a) For all the notifications
- *
* i) gemfire.distributedsystem.member.joined
- *
* ii) gemfire.distributedsystem.member.left
- *
* iii) gemfire.distributedsystem.member.suspect
- *
* iv ) All notifications emitted by member mbeans
- *
* vi) Alerts
- *
- * b) Concurrently modify proxy list by removing member and accessing the distributed system MBean
- *
+ * <p>
+ * b) Concurrently modify proxy list by removing member and accessing the
+ * distributed system MBean
+ * <p>
* c) Aggregate Operations like shutDownAll
- *
+ * <p>
* d) Member level operations like fetchJVMMetrics()
- *
+ * <p>
* e ) Statistics
- *
- *
- *
*/
@Category(DistributedTest.class)
-public class DistributedSystemDUnitTest extends ManagementTestBase {
+@SuppressWarnings({ "serial", "unused" })
+public class DistributedSystemDUnitTest implements Serializable {
private static final Logger logger = LogService.getLogger();
- private static final long serialVersionUID = 1L;
-
+ private static final String WARNING_LEVEL_MESSAGE = "Warning Level Alert Message";
+ private static final String SEVERE_LEVEL_MESSAGE = "Severe Level Alert Message";
- private static final int MAX_WAIT = 10 * 1000;
+ private static List<Notification> notifications;
+ private static Map<ObjectName, NotificationListener> notificationListenerMap;
- private static MBeanServer mbeanServer = MBeanJMXAdapter.mbeanServer;
+ @Manager
+ private VM managerVM;
- static List<Notification> notifList = new ArrayList<>();
+ @Member
+ private VM[] memberVMs;
- static Map<ObjectName, NotificationListener> notificationListenerMap =
- new HashMap<ObjectName, NotificationListener>();
+ @Rule
+ public ManagementTestRule managementTestRule = ManagementTestRule.builder().build();
- static final String WARNING_LEVEL_MESSAGE = "Warninglevel Alert Message";
-
- static final String SEVERE_LEVEL_MESSAGE = "Severelevel Alert Message";
+ @Before
+ public void before() throws Exception {
+ notifications = new ArrayList<>();
+ notificationListenerMap = new HashMap<>();
+ invokeInEveryVM(() -> notifications = new ArrayList<>());
+ invokeInEveryVM(() -> notificationListenerMap = new HashMap<>());
+ }
- public DistributedSystemDUnitTest() {
- super();
+ @After
+ public void after() throws Exception {
+ resetAlertCounts(this.managerVM);
}
/**
* Tests each and every operations that is defined on the MemberMXBean
- *
- * @throws Exception
*/
@Test
public void testDistributedSystemAggregate() throws Exception {
- VM managingNode = getManagingNode();
- createManagementCache(managingNode);
- startManagingNode(managingNode);
- addNotificationListener(managingNode);
+ this.managementTestRule.createManager(this.managerVM);
+ addNotificationListener(this.managerVM);
- for (VM vm : getManagedNodeList()) {
- createCache(vm);
+ for (VM memberVM : this.memberVMs) {
+ this.managementTestRule.createMember(memberVM);
}
- checkAggregate(managingNode);
- for (VM vm : getManagedNodeList()) {
- closeCache(vm);
- }
-
- closeCache(managingNode);
-
+ verifyDistributedSystemMXBean(this.managerVM);
}
/**
* Tests each and every operations that is defined on the MemberMXBean
- *
- * @throws Exception
*/
@Test
public void testAlertManagedNodeFirst() throws Exception {
-
- for (VM vm : getManagedNodeList()) {
- createCache(vm);
- warnLevelAlert(vm);
- severeLevelAlert(vm);
+ for (VM memberVM : this.memberVMs) {
+ this.managementTestRule.createMember(memberVM);
+ generateWarningAlert(memberVM);
+ generateSevereAlert(memberVM);
}
- VM managingNode = getManagingNode();
-
- createManagementCache(managingNode);
- startManagingNode(managingNode);
- addAlertListener(managingNode);
- checkAlertCount(managingNode, 0, 0);
+ this.managementTestRule.createManager(this.managerVM);
+ addAlertListener(this.managerVM);
+ verifyAlertCount(this.managerVM, 0, 0);
- final DistributedMember managingMember = getMember(managingNode);
+ DistributedMember managerDistributedMember = this.managementTestRule.getDistributedMember(this.managerVM);
- // Before we start we need to ensure that the initial (implicit) SEVERE alert has propagated
- // everywhere.
- for (VM vm : getManagedNodeList()) {
- ensureLoggerState(vm, managingMember, Alert.SEVERE);
+ // Before we start we need to ensure that the initial (implicit) SEVERE alert has propagated everywhere.
+ for (VM memberVM : this.memberVMs) {
+ verifyAlertAppender(memberVM, managerDistributedMember, Alert.SEVERE);
}
- setAlertLevel(managingNode, AlertDetails.getAlertLevelAsString(Alert.WARNING));
+ setAlertLevel(this.managerVM, AlertDetails.getAlertLevelAsString(Alert.WARNING));
- for (VM vm : getManagedNodeList()) {
- ensureLoggerState(vm, managingMember, Alert.WARNING);
- warnLevelAlert(vm);
- severeLevelAlert(vm);
+ for (VM memberVM : this.memberVMs) {
+ verifyAlertAppender(memberVM, managerDistributedMember, Alert.WARNING);
+ generateWarningAlert(memberVM);
+ generateSevereAlert(memberVM);
}
- checkAlertCount(managingNode, 3, 3);
- resetAlertCounts(managingNode);
+ verifyAlertCount(this.managerVM, 3, 3);
+ resetAlertCounts(this.managerVM);
- setAlertLevel(managingNode, AlertDetails.getAlertLevelAsString(Alert.SEVERE));
+ setAlertLevel(this.managerVM, AlertDetails.getAlertLevelAsString(Alert.SEVERE));
- for (VM vm : getManagedNodeList()) {
- ensureLoggerState(vm, managingMember, Alert.SEVERE);
- warnLevelAlert(vm);
- severeLevelAlert(vm);
+ for (VM memberVM : this.memberVMs) {
+ verifyAlertAppender(memberVM, managerDistributedMember, Alert.SEVERE);
+ generateWarningAlert(memberVM);
+ generateSevereAlert(memberVM);
}
- checkAlertCount(managingNode, 3, 0);
- resetAlertCounts(managingNode);
-
- for (VM vm : getManagedNodeList()) {
- closeCache(vm);
- }
-
- closeCache(managingNode);
- }
-
- @SuppressWarnings("serial")
- public void ensureLoggerState(VM vm1, final DistributedMember member, final int alertLevel)
- throws Exception {
- {
- vm1.invoke(new SerializableCallable("Ensure Logger State") {
-
- public Object call() throws Exception {
-
- Wait.waitForCriterion(new WaitCriterion() {
- public String description() {
- return "Waiting for all alert Listener to register with managed node";
- }
-
- public boolean done() {
-
- if (AlertAppender.getInstance().hasAlertListener(member, alertLevel)) {
- return true;
- }
- return false;
- }
-
- }, MAX_WAIT, 500, true);
-
- return null;
- }
- });
-
- }
+ verifyAlertCount(this.managerVM, 3, 0);
}
/**
* Tests each and every operations that is defined on the MemberMXBean
- *
- * @throws Exception
*/
@Test
public void testShutdownAll() throws Exception {
- final Host host = Host.getHost(0);
- VM managedNode1 = host.getVM(0);
- VM managedNode2 = host.getVM(1);
- VM managedNode3 = host.getVM(2);
-
- VM managingNode = host.getVM(3);
-
- // Managing Node is created first
- createManagementCache(managingNode);
- startManagingNode(managingNode);
-
- createCache(managedNode1);
- createCache(managedNode2);
- createCache(managedNode3);
- shutDownAll(managingNode);
- closeCache(managingNode);
+ VM memberVM1 = getHost(0).getVM(0);
+ VM memberVM2 = getHost(0).getVM(1);
+ VM memberVM3 = getHost(0).getVM(2);
+
+ VM managerVM = getHost(0).getVM(3);
+
+ // managerVM Node is created first
+ this.managementTestRule.createManager(managerVM);
+
+ this.managementTestRule.createMember(memberVM1);
+ this.managementTestRule.createMember(memberVM2);
+ this.managementTestRule.createMember(memberVM3);
+
+ shutDownAll(managerVM);
}
@Test
public void testNavigationAPIS() throws Exception {
+ this.managementTestRule.createManager(this.managerVM);
- final Host host = Host.getHost(0);
-
- createManagementCache(managingNode);
- startManagingNode(managingNode);
-
- for (VM vm : managedNodeList) {
- createCache(vm);
+ for (VM memberVM : this.memberVMs) {
+ this.managementTestRule.createMember(memberVM);
}
- checkNavigationAPIs(managingNode);
+ verifyFetchMemberObjectName(this.managerVM, this.memberVMs.length + 1);
}
@Test
public void testNotificationHub() throws Exception {
- this.initManagement(false);
+ this.managementTestRule.createMembers();
+ this.managementTestRule.createManagers();
class NotificationHubTestListener implements NotificationListener {
+
@Override
public synchronized void handleNotification(Notification notification, Object handback) {
logger.info("Notification received {}", notification);
- notifList.add(notification);
+ notifications.add(notification);
}
}
- managingNode.invoke(new SerializableRunnable("Add Listener to MemberMXBean") {
-
- public void run() {
- Cache cache = getCache();
- ManagementService service = getManagementService();
- final DistributedSystemMXBean bean = service.getDistributedSystemMXBean();
-
- Wait.waitForCriterion(new WaitCriterion() {
- public String description() {
- return "Waiting for all members to send their initial Data";
- }
-
- public boolean done() {
- if (bean.listMemberObjectNames().length == 5) {// including locator
- return true;
- } else {
- return false;
- }
- }
- }, MAX_WAIT, 500, true);
- for (ObjectName objectName : bean.listMemberObjectNames()) {
- NotificationHubTestListener listener = new NotificationHubTestListener();
- try {
- mbeanServer.addNotificationListener(objectName, listener, null, null);
- notificationListenerMap.put(objectName, listener);
- } catch (InstanceNotFoundException e) {
- LogWriterUtils.getLogWriter().error(e);
- }
- }
+ this.managerVM.invoke("addListenerToMemberMXBean", () -> {
+ ManagementService service = this.managementTestRule.getManagementService();
+ final DistributedSystemMXBean distributedSystemMXBean = service.getDistributedSystemMXBean();
+
+ await().until(() -> assertThat(distributedSystemMXBean.listMemberObjectNames()).hasSize(5));
+
+ for (ObjectName objectName : distributedSystemMXBean.listMemberObjectNames()) {
+ NotificationHubTestListener listener = new NotificationHubTestListener();
+ ManagementFactory.getPlatformMBeanServer().addNotificationListener(objectName, listener, null, null);
+ notificationListenerMap.put(objectName, listener);
}
});
// Check in all VMS
- for (VM vm : managedNodeList) {
- vm.invoke(new SerializableRunnable("Check Hub Listener num count") {
-
- public void run() {
- Cache cache = getCache();
- SystemManagementService service = (SystemManagementService) getManagementService();
- NotificationHub hub = service.getNotificationHub();
- Map<ObjectName, NotificationHubListener> listenerObjectMap = hub.getListenerObjectMap();
- assertEquals(1, listenerObjectMap.keySet().size());
- ObjectName memberMBeanName = MBeanJMXAdapter
- .getMemberMBeanName(cache.getDistributedSystem().getDistributedMember());
+ for (VM memberVM : this.memberVMs) {
+ memberVM.invoke("checkNotificationHubListenerCount", () -> {
+ SystemManagementService service = this.managementTestRule.getSystemManagementService();
+ NotificationHub notificationHub = service.getNotificationHub();
+ Map<ObjectName, NotificationHubListener> listenerMap = notificationHub.getListenerObjectMap();
+ assertThat(listenerMap.keySet()).hasSize(1);
- NotificationHubListener listener = listenerObjectMap.get(memberMBeanName);
+ ObjectName memberMBeanName = MBeanJMXAdapter.getMemberMBeanName(this.managementTestRule.getDistributedMember());
+ NotificationHubListener listener = listenerMap.get(memberMBeanName);
- /*
- * Counter of listener should be 2 . One for default Listener which is added for each
- * member mbean by distributed system mbean One for the added listener in test
- */
- assertEquals(2, listener.getNumCounter());
+ /*
+ * Counter of listener should be 2 . One for default Listener which is
+ * added for each member mbean by distributed system mbean One for the
+ * added listener in test
+ */
+ assertThat(listener.getNumCounter()).isEqualTo(2);
- // Raise some notifications
+ // Raise some notifications
- NotificationBroadcasterSupport memberLevelNotifEmitter =
- (MemberMBean) service.getMemberMXBean();
+ NotificationBroadcasterSupport notifier = (MemberMBean) service.getMemberMXBean();
+ String memberSource = MBeanJMXAdapter.getMemberNameOrId(this.managementTestRule.getDistributedMember());
- String memberSource = MBeanJMXAdapter
- .getMemberNameOrId(cache.getDistributedSystem().getDistributedMember());
-
- // Only a dummy notification , no actual region is creates
- Notification notification = new Notification(JMXNotificationType.REGION_CREATED,
- memberSource, SequenceNumber.next(), System.currentTimeMillis(),
- ManagementConstants.REGION_CREATED_PREFIX + "/test");
- memberLevelNotifEmitter.sendNotification(notification);
-
- }
+ // Only a dummy notification , no actual region is created
+ Notification notification = new Notification(JMXNotificationType.REGION_CREATED, memberSource, SequenceNumber.next(), System.currentTimeMillis(), ManagementConstants.REGION_CREATED_PREFIX + "/test");
+ notifier.sendNotification(notification);
});
}
- managingNode.invoke(new SerializableRunnable("Check notifications && Remove Listeners") {
-
- public void run() {
-
- Wait.waitForCriterion(new WaitCriterion() {
- public String description() {
- return "Waiting for all Notifications to reach the Managing Node";
- }
-
- public boolean done() {
- if (notifList.size() == 3) {
- return true;
- } else {
- return false;
- }
- }
- }, MAX_WAIT, 500, true);
+ this.managerVM.invoke("checkNotificationsAndRemoveListeners", () -> {
+ await().until(() -> assertThat(notifications).hasSize(3));
- notifList.clear();
-
- Iterator<ObjectName> it = notificationListenerMap.keySet().iterator();
- while (it.hasNext()) {
- ObjectName objectName = it.next();
- NotificationListener listener = notificationListenerMap.get(objectName);
- try {
- mbeanServer.removeNotificationListener(objectName, listener);
- } catch (ListenerNotFoundException e) {
- LogWriterUtils.getLogWriter().error(e);
- } catch (InstanceNotFoundException e) {
- LogWriterUtils.getLogWriter().error(e);
- }
- }
+ notifications.clear();
+ for (ObjectName objectName : notificationListenerMap.keySet()) {
+ NotificationListener listener = notificationListenerMap.get(objectName);
+ ManagementFactory.getPlatformMBeanServer().removeNotificationListener(objectName, listener);
}
});
// Check in all VMS again
- for (VM vm : managedNodeList) {
- vm.invoke(new SerializableRunnable("Check Hub Listener num count Again") {
-
- public void run() {
- Cache cache = getCache();
- SystemManagementService service = (SystemManagementService) getManagementService();
- NotificationHub hub = service.getNotificationHub();
- Map<ObjectName, NotificationHubListener> listenerObjectMap = hub.getListenerObjectMap();
-
- assertEquals(1, listenerObjectMap.keySet().size());
-
- ObjectName memberMBeanName = MBeanJMXAdapter
- .getMemberMBeanName(cache.getDistributedSystem().getDistributedMember());
-
- NotificationHubListener listener = listenerObjectMap.get(memberMBeanName);
-
- /*
- * Counter of listener should be 1 for the default Listener which is added for each member
- * mbean by distributed system mbean.
- */
- assertEquals(1, listener.getNumCounter());
-
- }
+ for (VM memberVM : this.memberVMs) {
+ memberVM.invoke("checkNotificationHubListenerCountAgain", () -> {
+ SystemManagementService service = this.managementTestRule.getSystemManagementService();
+ NotificationHub hub = service.getNotificationHub();
+ Map<ObjectName, NotificationHubListener> listenerObjectMap = hub.getListenerObjectMap();
+ assertThat(listenerObjectMap.keySet().size()).isEqualTo(1);
+
+ ObjectName memberMBeanName = MBeanJMXAdapter.getMemberMBeanName(this.managementTestRule.getDistributedMember());
+ NotificationHubListener listener = listenerObjectMap.get(memberMBeanName);
+
+ /*
+ * Counter of listener should be 1 for the default Listener which is
+ * added for each member mbean by distributed system mbean.
+ */
+ assertThat(listener.getNumCounter()).isEqualTo(1);
});
}
- managingNode.invoke(new SerializableRunnable("Remove Listener from MemberMXBean") {
-
- public void run() {
- Cache cache = getCache();
- ManagementService service = getManagementService();
- final DistributedSystemMXBean bean = service.getDistributedSystemMXBean();
-
- Wait.waitForCriterion(new WaitCriterion() {
- public String description() {
- return "Waiting for all members to send their initial Data";
- }
-
- public boolean done() {
- if (bean.listMemberObjectNames().length == 5) {// including locator
- return true;
- } else {
- return false;
- }
-
- }
-
- }, MAX_WAIT, 500, true);
- for (ObjectName objectName : bean.listMemberObjectNames()) {
- NotificationHubTestListener listener = new NotificationHubTestListener();
- try {
- mbeanServer.removeNotificationListener(objectName, listener);
- } catch (InstanceNotFoundException e) {
- LogWriterUtils.getLogWriter().error(e);
- } catch (ListenerNotFoundException e) {
- // TODO: apparently there is never a notification listener on any these mbeans at this
- // point
- // fix this test so it doesn't hit these unexpected exceptions --
- // getLogWriter().error(e);
- }
+ this.managerVM.invoke("removeListenerFromMemberMXBean", () -> {
+ ManagementService service = this.managementTestRule.getManagementService();
+ DistributedSystemMXBean distributedSystemMXBean = service.getDistributedSystemMXBean();
+
+ await().until(() -> assertThat(distributedSystemMXBean.listMemberObjectNames()).hasSize(5));
+
+ for (ObjectName objectName : distributedSystemMXBean.listMemberObjectNames()) {
+ NotificationHubTestListener listener = new NotificationHubTestListener();
+ try {
+ ManagementFactory.getPlatformMBeanServer().removeNotificationListener(objectName, listener); // because new instance!!
+ } catch (ListenerNotFoundException e) {
+ // TODO: [old] apparently there is never a notification listener on any these mbeans at this point [fix this]
+ // fix this test so it doesn't hit these unexpected exceptions -- getLogWriter().error(e);
}
}
});
- for (VM vm : managedNodeList) {
- vm.invoke(new SerializableRunnable("Check Hub Listeners clean up") {
-
- public void run() {
- Cache cache = getCache();
- SystemManagementService service = (SystemManagementService) getManagementService();
- NotificationHub hub = service.getNotificationHub();
- hub.cleanUpListeners();
- assertEquals(0, hub.getListenerObjectMap().size());
-
- Iterator<ObjectName> it = notificationListenerMap.keySet().iterator();
- while (it.hasNext()) {
- ObjectName objectName = it.next();
- NotificationListener listener = notificationListenerMap.get(objectName);
- try {
- mbeanServer.removeNotificationListener(objectName, listener);
- fail("Found Listeners inspite of clearing them");
- } catch (ListenerNotFoundException e) {
- // Expected Exception Do nothing
- } catch (InstanceNotFoundException e) {
- LogWriterUtils.getLogWriter().error(e);
- }
- }
+ for (VM memberVM : this.memberVMs) {
+ memberVM.invoke("verifyNotificationHubListenersWereRemoved", () -> {
+ SystemManagementService service = this.managementTestRule.getSystemManagementService();
+ NotificationHub notificationHub = service.getNotificationHub();
+ notificationHub.cleanUpListeners();
+ assertThat(notificationHub.getListenerObjectMap()).isEmpty();
+
+ for (ObjectName objectName : notificationListenerMap.keySet()) {
+ NotificationListener listener = notificationListenerMap.get(objectName);
+ assertThatThrownBy(() -> ManagementFactory.getPlatformMBeanServer().removeNotificationListener(objectName, listener)).isExactlyInstanceOf(ListenerNotFoundException.class);
}
});
}
@@ -486,404 +328,212 @@ public class DistributedSystemDUnitTest extends ManagementTestBase {
/**
* Tests each and every operations that is defined on the MemberMXBean
- *
- * @throws Exception
*/
@Test
public void testAlert() throws Exception {
- VM managingNode = getManagingNode();
-
- createManagementCache(managingNode);
- startManagingNode(managingNode);
- addAlertListener(managingNode);
- resetAlertCounts(managingNode);
+ this.managementTestRule.createManager(this.managerVM);
+ addAlertListener(this.managerVM);
+ resetAlertCounts(this.managerVM);
- final DistributedMember managingMember = getMember(managingNode);
+ DistributedMember managerDistributedMember = this.managementTestRule.getDistributedMember(this.managerVM);
+ generateWarningAlert(this.managerVM);
+ generateSevereAlert(this.managerVM);
+ verifyAlertCount(this.managerVM, 1, 0);
+ resetAlertCounts(this.managerVM);
+ for (VM memberVM : this.memberVMs) {
+ this.managementTestRule.createMember(memberVM);
- warnLevelAlert(managingNode);
- severeLevelAlert(managingNode);
- checkAlertCount(managingNode, 1, 0);
- resetAlertCounts(managingNode);
-
- for (VM vm : getManagedNodeList()) {
-
- createCache(vm);
- // Default is severe ,So only Severe level alert is expected
-
- ensureLoggerState(vm, managingMember, Alert.SEVERE);
-
- warnLevelAlert(vm);
- severeLevelAlert(vm);
+ verifyAlertAppender(memberVM, managerDistributedMember, Alert.SEVERE);
+ generateWarningAlert(memberVM);
+ generateSevereAlert(memberVM);
}
- checkAlertCount(managingNode, 3, 0);
- resetAlertCounts(managingNode);
- setAlertLevel(managingNode, AlertDetails.getAlertLevelAsString(Alert.WARNING));
+ verifyAlertCount(this.managerVM, 3, 0);
+ resetAlertCounts(this.managerVM);
+ setAlertLevel(this.managerVM, AlertDetails.getAlertLevelAsString(Alert.WARNING));
- for (VM vm : getManagedNodeList()) {
- // warning and severe alerts both are to be checked
- ensureLoggerState(vm, managingMember, Alert.WARNING);
- warnLevelAlert(vm);
- severeLevelAlert(vm);
+ for (VM memberVM : this.memberVMs) {
+ verifyAlertAppender(memberVM, managerDistributedMember, Alert.WARNING);
+ generateWarningAlert(memberVM);
+ generateSevereAlert(memberVM);
}
- checkAlertCount(managingNode, 3, 3);
-
- resetAlertCounts(managingNode);
+ verifyAlertCount(this.managerVM, 3, 3);
- setAlertLevel(managingNode, AlertDetails.getAlertLevelAsString(Alert.OFF));
+ resetAlertCounts(this.managerVM);
- for (VM vm : getManagedNodeList()) {
- ensureLoggerState(vm, managingMember, Alert.OFF);
- warnLevelAlert(vm);
- severeLevelAlert(vm);
- }
- checkAlertCount(managingNode, 0, 0);
- resetAlertCounts(managingNode);
+ setAlertLevel(this.managerVM, AlertDetails.getAlertLevelAsString(Alert.OFF));
- for (VM vm : getManagedNodeList()) {
- closeCache(vm);
+ for (VM memberVM : this.memberVMs) {
+ verifyAlertAppender(memberVM, managerDistributedMember, Alert.OFF);
+ generateWarningAlert(memberVM);
+ generateSevereAlert(memberVM);
}
- closeCache(managingNode);
-
+ verifyAlertCount(this.managerVM, 0, 0);
}
- @SuppressWarnings("serial")
- public void checkAlertCount(VM vm1, final int expectedSevereAlertCount,
- final int expectedWarningAlertCount) throws Exception {
- {
- vm1.invoke(new SerializableCallable("Check Alert Count") {
-
- public Object call() throws Exception {
- final AlertNotifListener nt = AlertNotifListener.getInstance();
- Wait.waitForCriterion(new WaitCriterion() {
- public String description() {
- return "Waiting for all alerts to reach the Managing Node";
- }
-
- public boolean done() {
- if (expectedSevereAlertCount == nt.getseverAlertCount()
- && expectedWarningAlertCount == nt.getWarnigAlertCount()) {
- return true;
- } else {
- return false;
- }
-
- }
-
- }, MAX_WAIT, 500, true);
-
- return null;
- }
- });
-
- }
+ private void verifyAlertAppender(final VM memberVM, final DistributedMember member, final int alertLevel) {
+ memberVM.invoke("verifyAlertAppender", () -> await().until(() -> assertThat(AlertAppender.getInstance().hasAlertListener(member, alertLevel)).isTrue()));
}
+ private void verifyAlertCount(final VM managerVM, final int expectedSevereAlertCount, final int expectedWarningAlertCount) {
+ managerVM.invoke("verifyAlertCount", () -> {
+ AlertNotificationListener listener = AlertNotificationListener.getInstance();
-
- @SuppressWarnings("serial")
- public void setAlertLevel(VM vm1, final String alertLevel) throws Exception {
- {
- vm1.invoke(new SerializableCallable("Set Alert level") {
-
- public Object call() throws Exception {
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
- ManagementService service = getManagementService();
- DistributedSystemMXBean bean = service.getDistributedSystemMXBean();
- assertNotNull(bean);
- bean.changeAlertLevel(alertLevel);
-
- return null;
- }
- });
-
- }
+ await().until(() -> assertThat(listener.getSevereAlertCount()).isEqualTo(expectedSevereAlertCount));
+ await().until(() -> assertThat(listener.getWarningAlertCount()).isEqualTo(expectedWarningAlertCount));
+ });
}
- @SuppressWarnings("serial")
- public void warnLevelAlert(VM vm1) throws Exception {
- {
- vm1.invoke(new SerializableCallable("Warning level Alerts") {
-
- public Object call() throws Exception {
- final IgnoredException warnEx =
- IgnoredException.addIgnoredException(WARNING_LEVEL_MESSAGE);
- logger.warn(WARNING_LEVEL_MESSAGE);
- warnEx.remove();
- return null;
- }
- });
-
- }
+ private void setAlertLevel(final VM managerVM, final String alertLevel) {
+ managerVM.invoke("setAlertLevel", () -> {
+ ManagementService service = this.managementTestRule.getManagementService();
+ DistributedSystemMXBean distributedSystemMXBean = service.getDistributedSystemMXBean();
+ distributedSystemMXBean.changeAlertLevel(alertLevel);
+ });
}
-
- @SuppressWarnings("serial")
- public void resetAlertCounts(VM vm1) throws Exception {
- {
- vm1.invoke(new SerializableCallable("Reset Alert Count") {
-
- public Object call() throws Exception {
- AlertNotifListener nt = AlertNotifListener.getInstance();
- nt.resetCount();
- return null;
- }
- });
-
- }
+ private void generateWarningAlert(final VM anyVM) {
+ anyVM.invoke("generateWarningAlert", () -> {
+ IgnoredException ignoredException = IgnoredException.addIgnoredException(WARNING_LEVEL_MESSAGE);
+ logger.warn(WARNING_LEVEL_MESSAGE);
+ ignoredException.remove();
+ });
}
- @SuppressWarnings("serial")
- public void severeLevelAlert(VM vm1) throws Exception {
- {
- vm1.invoke(new SerializableCallable("Severe Level Alert") {
-
- public Object call() throws Exception {
- // add expected exception strings
-
- final IgnoredException severeEx =
- IgnoredException.addIgnoredException(SEVERE_LEVEL_MESSAGE);
- logger.fatal(SEVERE_LEVEL_MESSAGE);
- severeEx.remove();
- return null;
- }
- });
-
- }
+ private void resetAlertCounts(final VM managerVM) {
+ managerVM.invoke("resetAlertCounts", () -> {
+ AlertNotificationListener listener = AlertNotificationListener.getInstance();
+ listener.resetCount();
+ });
}
- @SuppressWarnings("serial")
- public void addAlertListener(VM vm1) throws Exception {
- {
- vm1.invoke(new SerializableCallable("Add Alert Listener") {
-
- public Object call() throws Exception {
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
- ManagementService service = getManagementService();
- DistributedSystemMXBean bean = service.getDistributedSystemMXBean();
- AlertNotifListener nt = AlertNotifListener.getInstance();
- nt.resetCount();
-
- NotificationFilter notificationFilter = new NotificationFilter() {
- @Override
- public boolean isNotificationEnabled(Notification notification) {
- return notification.getType().equals(JMXNotificationType.SYSTEM_ALERT);
- }
+ private void generateSevereAlert(final VM anyVM) {
+ anyVM.invoke("generateSevereAlert", () -> {
+ IgnoredException ignoredException = IgnoredException.addIgnoredException(SEVERE_LEVEL_MESSAGE);
+ logger.fatal(SEVERE_LEVEL_MESSAGE);
+ ignoredException.remove();
+ });
+ }
- };
+ private void addAlertListener(final VM managerVM) {
+ managerVM.invoke("addAlertListener", () -> {
+ AlertNotificationListener listener = AlertNotificationListener.getInstance();
+ listener.resetCount();
- mbeanServer.addNotificationListener(MBeanJMXAdapter.getDistributedSystemName(), nt,
- notificationFilter, null);
+ NotificationFilter notificationFilter = (Notification notification) -> notification.getType().equals(JMXNotificationType.SYSTEM_ALERT);
- return null;
- }
- });
-
- }
+ ManagementFactory.getPlatformMBeanServer().addNotificationListener(MBeanJMXAdapter.getDistributedSystemName(), listener, notificationFilter, null);
+ });
}
/**
* Check aggregate related functions and attributes
- *
- * @param vm1
- * @throws Exception
*/
- @SuppressWarnings("serial")
- public void checkAggregate(VM vm1) throws Exception {
- {
- vm1.invoke(new SerializableCallable("Chech Aggregate Attributes") {
-
- public Object call() throws Exception {
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ private void verifyDistributedSystemMXBean(final VM managerVM) {
+ managerVM.invoke("verifyDistributedSystemMXBean", () -> {
+ ManagementService service = this.managementTestRule.getManagementService();
+ DistributedSystemMXBean distributedSystemMXBean = service.getDistributedSystemMXBean();
- ManagementService service = getManagementService();
+ await().until(() -> assertThat(distributedSystemMXBean.getMemberCount()).isEqualTo(5));
- final DistributedSystemMXBean bean = service.getDistributedSystemMXBean();
- assertNotNull(service.getDistributedSystemMXBean());
-
- Wait.waitForCriterion(new WaitCriterion() {
- public String description() {
- return "Waiting All members to intitialize DistributedSystemMBean expect 5 but found "
- + bean.getMemberCount();
- }
-
- public boolean done() {
- // including locator
- if (bean.getMemberCount() == 5) {
- return true;
- } else {
- return false;
- }
-
- }
-
- }, MAX_WAIT, 500, true);
-
-
-
- final Set<DistributedMember> otherMemberSet =
- cache.getDistributionManager().getOtherNormalDistributionManagerIds();
- Iterator<DistributedMember> memberIt = otherMemberSet.iterator();
- while (memberIt.hasNext()) {
- DistributedMember member = memberIt.next();
- LogWriterUtils.getLogWriter().info("JVM Metrics For Member " + member.getId() + ":"
- + bean.showJVMMetrics(member.getId()));
- LogWriterUtils.getLogWriter().info("OS Metrics For Member " + member.getId() + ":"
- + bean.showOSMetrics(member.getId()));
- }
-
- return null;
- }
- });
-
- }
+ Set<DistributedMember> otherMemberSet = this.managementTestRule.getOtherNormalMembers();
+ for (DistributedMember member : otherMemberSet) {
+ // TODO: need assertions? JVMMetrics and OSMetrics
+ }
+ });
}
- @SuppressWarnings("serial")
- public void addNotificationListener(VM vm1) throws Exception {
- {
- vm1.invoke(new SerializableCallable("Add Notification Listener") {
-
- public Object call() throws Exception {
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
- ManagementService service = getManagementService();
- DistributedSystemMXBean bean = service.getDistributedSystemMXBean();
- assertNotNull(bean);
- TestDistributedSystemNotif nt = new TestDistributedSystemNotif();
- mbeanServer.addNotificationListener(MBeanJMXAdapter.getDistributedSystemName(), nt, null,
- null);
-
- return null;
- }
- });
+ private void addNotificationListener(final VM managerVM) {
+ managerVM.invoke("addNotificationListener", () -> {
+ ManagementService service = this.managementTestRule.getManagementService();
+ DistributedSystemMXBean distributedSystemMXBean = service.getDistributedSystemMXBean();
+ assertThat(distributedSystemMXBean).isNotNull();
- }
+ DistributedSystemNotificationListener listener = new DistributedSystemNotificationListener();
+ ManagementFactory.getPlatformMBeanServer().addNotificationListener(MBeanJMXAdapter.getDistributedSystemName(), listener, null, null);
+ });
}
+ private void shutDownAll(final VM managerVM) {
+ managerVM.invoke("shutDownAll", () -> {
+ ManagementService service = this.managementTestRule.getManagementService();
+ DistributedSystemMXBean distributedSystemMXBean = service.getDistributedSystemMXBean();
+ distributedSystemMXBean.shutDownAllMembers();
-
- @SuppressWarnings("serial")
- public void shutDownAll(VM vm1) throws Exception {
- {
- vm1.invoke(new SerializableCallable("Shut Down All") {
-
- public Object call() throws Exception {
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
- ManagementService service = getManagementService();
- DistributedSystemMXBean bean = service.getDistributedSystemMXBean();
- assertNotNull(service.getDistributedSystemMXBean());
- bean.shutDownAllMembers();
- Wait.pause(2000);
- assertEquals(cache.getDistributedSystem().getAllOtherMembers().size(), 1);
- return null;
- }
- });
-
- }
+ await().until(() -> assertThat(this.managementTestRule.getOtherNormalMembers()).hasSize(0));
+ });
}
+ private void verifyFetchMemberObjectName(final VM managerVM, final int memberCount) {
+ managerVM.invoke("verifyFetchMemberObjectName", () -> {
+ ManagementService service = this.managementTestRule.getManagementService();
+ DistributedSystemMXBean distributedSystemMXBean = service.getDistributedSystemMXBean();
+ await().until(() -> assertThat(distributedSystemMXBean.listMemberObjectNames()).hasSize(memberCount));
- @SuppressWarnings("serial")
- public void checkNavigationAPIs(VM vm1) throws Exception {
- {
- vm1.invoke(new SerializableCallable("Check Navigation APIS") {
-
- public Object call() throws Exception {
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
- ManagementService service = getManagementService();
- final DistributedSystemMXBean bean = service.getDistributedSystemMXBean();
-
- assertNotNull(service.getDistributedSystemMXBean());
-
- waitForAllMembers(4);
-
- for (int i = 0; i < bean.listMemberObjectNames().length; i++) {
- LogWriterUtils.getLogWriter()
- .info("ObjectNames Of the Mmeber" + bean.listMemberObjectNames()[i]);
- }
-
-
- ObjectName thisMemberName = MBeanJMXAdapter.getMemberMBeanName(
- InternalDistributedSystem.getConnectedInstance().getDistributedMember().getId());
-
- ObjectName memberName = bean.fetchMemberObjectName(
- InternalDistributedSystem.getConnectedInstance().getDistributedMember().getId());
- assertEquals(thisMemberName, memberName);
-
- return null;
- }
- });
-
- }
+ String memberId = this.managementTestRule.getDistributedMember().getId();
+ ObjectName thisMemberName = MBeanJMXAdapter.getMemberMBeanName(memberId);
+ ObjectName memberName = distributedSystemMXBean.fetchMemberObjectName(memberId);
+ assertThat(memberName).isEqualTo(thisMemberName);
+ });
}
+ private ConditionFactory await() {
+ return Awaitility.await().atMost(2, MINUTES);
+ }
- /**
- * Notification handler
- *
- *
- */
- private static class TestDistributedSystemNotif implements NotificationListener {
+ private static class DistributedSystemNotificationListener implements NotificationListener {
@Override
- public void handleNotification(Notification notification, Object handback) {
- assertNotNull(notification);
+ public void handleNotification(final Notification notification, final Object handback) {
+ assertThat(notification).isNotNull();
}
-
}
- /**
- * Notification handler
- *
- *
- */
- private static class AlertNotifListener implements NotificationListener {
+ private static class AlertNotificationListener implements NotificationListener {
+
+ private static AlertNotificationListener listener = new AlertNotificationListener();
+
+ private int warningAlertCount = 0;
- private static AlertNotifListener listener = new AlertNotifListener();
+ private int severeAlertCount = 0;
- public static AlertNotifListener getInstance() {
+ static AlertNotificationListener getInstance() { // TODO: get rid of singleton
return listener;
}
- private int warnigAlertCount = 0;
+ @Override
+ public synchronized void handleNotification(final Notification notification, final Object handback) {
+ assertThat(notification).isNotNull();
- private int severAlertCount = 0;
+ Map<String, String> notificationUserData = (Map<String, String>) notification.getUserData();
- @Override
- public synchronized void handleNotification(Notification notification, Object handback) {
- assertNotNull(notification);
- logger.info("Notification received {}", notification);
- Map<String, String> notifUserData = (Map<String, String>) notification.getUserData();
- if (notifUserData.get(JMXNotificationUserData.ALERT_LEVEL).equalsIgnoreCase("warning")) {
- assertEquals(WARNING_LEVEL_MESSAGE, notification.getMessage());
- ++warnigAlertCount;
+ if (notificationUserData.get(JMXNotificationUserData.ALERT_LEVEL).equalsIgnoreCase("warning")) {
+ assertThat(notification.getMessage()).isEqualTo(WARNING_LEVEL_MESSAGE);
+ warningAlertCount++;
}
- if (notifUserData.get(JMXNotificationUserData.ALERT_LEVEL).equalsIgnoreCase("severe")) {
- assertEquals(SEVERE_LEVEL_MESSAGE, notification.getMessage());
- ++severAlertCount;
+ if (notificationUserData.get(JMXNotificationUserData.ALERT_LEVEL).equalsIgnoreCase("severe")) {
+ assertThat(notification.getMessage()).isEqualTo(SEVERE_LEVEL_MESSAGE);
+ severeAlertCount++;
}
}
- public void resetCount() {
- warnigAlertCount = 0;
-
- severAlertCount = 0;
+ void resetCount() {
+ warningAlertCount = 0;
+ severeAlertCount = 0;
}
- public int getWarnigAlertCount() {
- return warnigAlertCount;
+ int getWarningAlertCount() {
+ return warningAlertCount;
}
- public int getseverAlertCount() {
- return severAlertCount;
+ int getSevereAlertCount() {
+ return severeAlertCount;
}
-
}
-
}
http://git-wip-us.apache.org/repos/asf/geode/blob/c3586a96/geode-core/src/test/java/org/apache/geode/management/JMXMBeanDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/management/JMXMBeanDUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/JMXMBeanDUnitTest.java
index 0feb4c2..0096f0d 100644
--- a/geode-core/src/test/java/org/apache/geode/management/JMXMBeanDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/JMXMBeanDUnitTest.java
@@ -49,7 +49,7 @@ import org.apache.geode.test.junit.categories.FlakyTest;
import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
import org.apache.geode.util.test.TestUtil;
-public class JMXMBeanDUnitTest extends DistributedTestCase {
+public class JMXMBeanDUnitTest extends DistributedTestCase { // TODO: rename and fix on Mac
private Host host;
private VM locator;