You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/12/12 22:36:43 UTC
[40/46] geode git commit: Convert from ManagementTestCase to
ManagementTestRule
http://git-wip-us.apache.org/repos/asf/geode/blob/c3586a96/geode-core/src/test/java/org/apache/geode/management/ClientHealthStatsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/management/ClientHealthStatsDUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/ClientHealthStatsDUnitTest.java
index a3c8b27..2c7ae07 100644
--- a/geode-core/src/test/java/org/apache/geode/management/ClientHealthStatsDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/ClientHealthStatsDUnitTest.java
@@ -14,13 +14,25 @@
*/
package org.apache.geode.management;
+import static java.util.concurrent.TimeUnit.*;
import static org.apache.geode.distributed.ConfigurationProperties.*;
-import static org.apache.geode.test.dunit.Assert.*;
+import static org.apache.geode.test.dunit.Host.*;
+import static org.apache.geode.test.dunit.IgnoredException.*;
+import static org.apache.geode.test.dunit.Invoke.*;
+import static org.apache.geode.test.dunit.NetworkUtils.*;
+import static org.assertj.core.api.Assertions.*;
+import java.io.Serializable;
import java.util.Collection;
-import java.util.Iterator;
import java.util.Properties;
+import javax.management.ObjectName;
+
+import com.jayway.awaitility.Awaitility;
+import com.jayway.awaitility.core.ConditionFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -29,22 +41,17 @@ import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.ClientRegionFactory;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.distributed.DistributedMember;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
-import org.apache.geode.internal.i18n.LocalizedStrings;
-import org.apache.geode.test.dunit.Host;
-import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.management.internal.SystemManagementService;
import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
-import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
import org.apache.geode.test.junit.categories.DistributedTest;
import org.apache.geode.test.junit.categories.FlakyTest;
@@ -52,415 +59,343 @@ import org.apache.geode.test.junit.categories.FlakyTest;
* Client health stats check
*/
@Category(DistributedTest.class)
-@SuppressWarnings("serial")
-public class ClientHealthStatsDUnitTest extends JUnit4DistributedTestCase {
-
- private static final String k1 = "k1";
- private static final String k2 = "k2";
- private static final String client_k1 = "client-k1";
- private static final String client_k2 = "client-k2";
+@SuppressWarnings({ "serial", "unused" })
+public class ClientHealthStatsDUnitTest implements Serializable {
- /** name of the test region */
- private static final String REGION_NAME = "ClientHealthStatsDUnitTest_Region";
+ private static final int NUMBER_PUTS = 100;
- private static VM client = null;
- private static VM client2 = null;
- private static VM managingNode = null;
+ private static final String KEY1 = "KEY1";
+ private static final String KEY2 = "KEY2";
+ private static final String VALUE1 = "VALUE1";
+ private static final String VALUE2 = "VALUE2";
- private static ManagementTestBase helper = new ManagementTestBase() {};
+ private static final String REGION_NAME = ClientHealthStatsDUnitTest.class.getSimpleName() + "_Region";
- private static int numOfCreates = 0;
- private static int numOfUpdates = 0;
- private static int numOfInvalidates = 0;
- private static boolean lastKeyReceived = false;
+ // client1VM and client2VM VM fields
+ private static ClientCache clientCache;
- private static GemFireCacheImpl cache = null;
+ // TODO: assert following values in each client VM
+ private static int numOfCreates;
+ private static int numOfUpdates;
+ private static int numOfInvalidates;
+ private static boolean lastKeyReceived;
- private VM server = null;
+ private VM managerVM;
+ private VM serverVM;
+ private VM client1VM;
+ private VM client2VM;
- @Override
- public final void postSetUp() throws Exception {
- disconnectAllFromDS();
+ private String hostName;
- final Host host = Host.getHost(0);
- managingNode = host.getVM(0);
- server = host.getVM(1);
- client = host.getVM(2);
- client2 = host.getVM(3);
+ @Rule
+ public ManagementTestRule managementTestRule = ManagementTestRule.builder().build();
- IgnoredException.addIgnoredException("Connection reset");
- }
+ @Before
+ public void before() throws Exception {
+ this.hostName = getServerHostName(getHost(0));
- @Override
- public final void preTearDown() throws Exception {
- reset();
- helper.closeCache(managingNode);
- helper.closeCache(client);
- helper.closeCache(client2);
- helper.closeCache(server);
+ this.managerVM = getHost(0).getVM(0);
+ this.serverVM = getHost(0).getVM(1);
+ this.client1VM = getHost(0).getVM(2);
+ this.client2VM = getHost(0).getVM(3);
- disconnectAllFromDS();
+ addIgnoredException("Connection reset");
}
- private static void reset() throws Exception {
- lastKeyReceived = false;
- numOfCreates = 0;
- numOfUpdates = 0;
- numOfInvalidates = 0;
+ @After
+ public void after() throws Exception {
+ invokeInEveryVM(() -> {
+ lastKeyReceived = false;
+ numOfCreates = 0;
+ numOfUpdates = 0;
+ numOfInvalidates = 0;
+ clientCache = null;
+ });
}
@Test
public void testClientHealthStats_SubscriptionEnabled() throws Exception {
- helper.createManagementCache(managingNode);
- helper.startManagingNode(managingNode);
+ this.managementTestRule.createManager(this.managerVM, false);
+ this.managementTestRule.startManager(this.managerVM);
- int port = (Integer) server.invoke(() -> ClientHealthStatsDUnitTest.createServerCache());
+ int port = this.serverVM.invoke(() -> createServerCache());
- DistributedMember serverMember = helper.getMember(server);
+ this.client1VM.invoke(() -> createClientCache(this.hostName, port, 1, true));
+ this.client2VM.invoke(() -> createClientCache(this.hostName, port, 2, true));
- client.invoke(
- () -> ClientHealthStatsDUnitTest.createClientCache(server.getHost(), port, 1, true, false));
+ this.client1VM.invoke(() -> put());
+ this.client2VM.invoke(() -> put());
- client2.invoke(
- () -> ClientHealthStatsDUnitTest.createClientCache(server.getHost(), port, 2, true, false));
+ DistributedMember serverMember = this.managementTestRule.getDistributedMember(this.serverVM);
+ this.managerVM.invoke(() -> verifyClientStats(serverMember, port, 2));
- client.invoke(() -> ClientHealthStatsDUnitTest.put());
- client2.invoke(() -> ClientHealthStatsDUnitTest.put());
-
- managingNode.invoke(() -> ClientHealthStatsDUnitTest.verifyClientStats(serverMember, port, 2));
- helper.stopManagingNode(managingNode);
+ this.managementTestRule.stopManager(this.managerVM);
}
@Test
public void testClientHealthStats_SubscriptionDisabled() throws Exception {
- helper.createManagementCache(managingNode);
- helper.startManagingNode(managingNode);
-
- int port = (Integer) server.invoke(() -> ClientHealthStatsDUnitTest.createServerCache());
-
- DistributedMember serverMember = helper.getMember(server);
+ this.managementTestRule.createManager(this.managerVM, false);
+ this.managementTestRule.startManager(this.managerVM);
- client.invoke(() -> ClientHealthStatsDUnitTest.createClientCache(server.getHost(), port, 1,
- false, false));
+ int port = this.serverVM.invoke(() -> createServerCache());
- client2.invoke(() -> ClientHealthStatsDUnitTest.createClientCache(server.getHost(), port, 2,
- false, false));
+ this.client1VM.invoke(() -> createClientCache(this.hostName, port, 1, false));
+ this.client2VM.invoke(() -> createClientCache(this.hostName, port, 2, false));
- client.invoke(() -> ClientHealthStatsDUnitTest.put());
- client2.invoke(() -> ClientHealthStatsDUnitTest.put());
+ this.client1VM.invoke(() -> put());
+ this.client2VM.invoke(() -> put());
- managingNode.invoke(() -> ClientHealthStatsDUnitTest.verifyClientStats(serverMember, port, 0));
- helper.stopManagingNode(managingNode);
+ DistributedMember serverMember = this.managementTestRule.getDistributedMember(this.serverVM);
+ this.managerVM.invoke(() -> verifyClientStats(serverMember, port, 0));
+ this.managementTestRule.stopManager(this.managerVM);
}
@Test
public void testClientHealthStats_DurableClient() throws Exception {
- helper.createManagementCache(managingNode);
- helper.startManagingNode(managingNode);
+ this.managementTestRule.createManager(this.managerVM, false);
+ this.managementTestRule.startManager(this.managerVM);
- int port = (Integer) server.invoke(() -> ClientHealthStatsDUnitTest.createServerCache());
+ int port = this.serverVM.invoke(() -> createServerCache());
- DistributedMember serverMember = helper.getMember(server);
+ this.client1VM.invoke(() -> createClientCache(this.hostName, port, 1, true));
+ this.client2VM.invoke(() -> createClientCache(this.hostName, port, 2, true));
- client.invoke(
- () -> ClientHealthStatsDUnitTest.createClientCache(server.getHost(), port, 1, true, true));
+ this.client1VM.invoke(() -> put());
+ this.client2VM.invoke(() -> put());
- client2.invoke(
- () -> ClientHealthStatsDUnitTest.createClientCache(server.getHost(), port, 2, true, true));
+ this.client1VM.invoke(() -> clientCache.close(true));
+ this.client2VM.invoke(() -> clientCache.close(true));
- client.invoke(() -> ClientHealthStatsDUnitTest.put());
- client2.invoke(() -> ClientHealthStatsDUnitTest.put());
-
- client.invoke(() -> ClientHealthStatsDUnitTest.closeClientCache());
-
- client2.invoke(() -> ClientHealthStatsDUnitTest.closeClientCache());
-
- managingNode.invoke(() -> ClientHealthStatsDUnitTest.verifyClientStats(serverMember, port, 2));
- helper.stopManagingNode(managingNode);
+ DistributedMember serverMember = this.managementTestRule.getDistributedMember(this.serverVM);
+ this.managerVM.invoke(() -> verifyClientStats(serverMember, port, 2));
+ this.managementTestRule.stopManager(this.managerVM);
}
- @Category(FlakyTest.class) // GEODE-337
@Test
public void testStatsMatchWithSize() throws Exception {
- // start a server
- int port = (Integer) server.invoke(() -> ClientHealthStatsDUnitTest.createServerCache());
- // create durable client, with durable RI
- client.invoke(
- () -> ClientHealthStatsDUnitTest.createClientCache(server.getHost(), port, 1, true, false));
- // do puts on server from three different threads, pause after 500 puts each.
- server.invoke(() -> ClientHealthStatsDUnitTest.doPuts());
- // close durable client
- client.invoke(() -> ClientHealthStatsDUnitTest.closeClientCache());
-
- server.invoke("verifyProxyHasBeenPaused", () -> verifyProxyHasBeenPaused());
- // resume puts on server, add another 100.
- server.invokeAsync(() -> ClientHealthStatsDUnitTest.resumePuts());
- // start durable client
- client.invoke(
- () -> ClientHealthStatsDUnitTest.createClientCache(server.getHost(), port, 1, true, false));
- // wait for full queue dispatch
- client.invoke(() -> ClientHealthStatsDUnitTest.waitForLastKey());
- // verify the stats
- server.invoke(() -> ClientHealthStatsDUnitTest.verifyStats(port));
- }
+ int port = this.serverVM.invoke(() -> createServerCache()); // start a serverVM
- private static void verifyProxyHasBeenPaused() {
+ this.client1VM.invoke(() -> createClientCache(this.hostName, port, 1, true)); // create durable client1VM, with durable RI
- WaitCriterion criterion = new WaitCriterion() {
+ this.serverVM.invoke(() -> doPuts()); // do puts on serverVM from three different threads, pause after 500 puts each.
- @Override
- public boolean done() {
- CacheClientNotifier ccn = CacheClientNotifier.getInstance();
- Collection<CacheClientProxy> ccProxies = ccn.getClientProxies();
+ this.client1VM.invoke(() -> clientCache.close(true)); // close durable client1VM
- Iterator<CacheClientProxy> itr = ccProxies.iterator();
+ this.serverVM.invoke(() -> await().atMost(2, MINUTES).until(() -> cacheClientProxyHasBeenPause()));
- while (itr.hasNext()) {
- CacheClientProxy ccp = itr.next();
- System.out.println("proxy status " + ccp.getState());
- if (ccp.isPaused())
- return true;
- }
- return false;
- }
+ this.serverVM.invoke(() -> resumePuts()); // resume puts on serverVM, add another 100.
- @Override
- public String description() {
- return "Proxy has not paused yet";
- }
- };
+ this.client1VM.invoke(() -> createClientCache(this.hostName, port, 1, true)); // start durable client1VM
- Wait.waitForCriterion(criterion, 15 * 1000, 200, true);
+ this.client1VM.invoke(() -> await().atMost(1, MINUTES).until(() -> lastKeyReceived)); // wait for full queue dispatch
+
+ this.serverVM.invoke(() -> verifyStats(port)); // verify the stats
}
- private static int createServerCache() throws Exception {
- Cache cache = helper.createCache(false);
+ /**
+ * Invoked in serverVM
+ */
+ private boolean cacheClientProxyHasBeenPause() {
+ CacheClientNotifier clientNotifier = CacheClientNotifier.getInstance(); // TODO
+ //CacheClientNotifier clientNotifier = ((CacheServerImpl)this.managementTestRule.getCache().getCacheServers().get(0)).getAcceptor().getCacheClientNotifier();
- RegionFactory<String, String> rf = cache.createRegionFactory(RegionShortcut.REPLICATE);
- rf.setConcurrencyChecksEnabled(false);
- rf.create(REGION_NAME);
+ Collection<CacheClientProxy> clientProxies = clientNotifier.getClientProxies();
- CacheServer server1 = cache.addCacheServer();
- server1.setPort(0);
- server1.start();
- return server1.getPort();
+ for (CacheClientProxy clientProxy: clientProxies) {
+ if (clientProxy.isPaused()) {
+ return true;
+ }
+ }
+ return false;
}
- private static void closeClientCache() throws Exception {
- cache.close(true);
+ /**
+ * Invoked in serverVM
+ */
+ private int createServerCache() throws Exception {
+ Cache cache = this.managementTestRule.getCache();
+
+ RegionFactory<String, String> regionFactory = cache.createRegionFactory(RegionShortcut.REPLICATE);
+ regionFactory.setConcurrencyChecksEnabled(false);
+ regionFactory.create(REGION_NAME);
+
+ CacheServer cacheServer = cache.addCacheServer();
+ cacheServer.setPort(0);
+ cacheServer.start();
+ return cacheServer.getPort();
}
- private static void createClientCache(Host host, Integer port, int clientNum,
- boolean subscriptionEnabled, boolean durable) throws Exception {
+ /**
+ * Invoked in client1VM and client2VM
+ */
+ private void createClientCache(final String hostName,
+ final Integer port,
+ final int clientNum,
+ final boolean subscriptionEnabled) throws Exception {
Properties props = new Properties();
- props.setProperty(DURABLE_CLIENT_ID, "durable-" + clientNum);
- props.setProperty(DURABLE_CLIENT_TIMEOUT, "300000");
- props.setProperty(LOG_LEVEL, "info");
- props.setProperty(STATISTIC_ARCHIVE_FILE,
- getTestMethodName() + "_client_" + clientNum + ".gfs");
props.setProperty(STATISTIC_SAMPLING_ENABLED, "true");
- ClientCacheFactory ccf = new ClientCacheFactory(props);
+ ClientCacheFactory cacheFactory = new ClientCacheFactory(props);
if (subscriptionEnabled) {
- ccf.setPoolSubscriptionEnabled(true);
- ccf.setPoolSubscriptionAckInterval(50);
- ccf.setPoolSubscriptionRedundancy(0);
+ cacheFactory.setPoolSubscriptionEnabled(true);
+ cacheFactory.setPoolSubscriptionAckInterval(50);
+ cacheFactory.setPoolSubscriptionRedundancy(0);
}
- if (durable) {
- ccf.set(DURABLE_CLIENT_ID, "DurableClientId_" + clientNum);
- ccf.set(DURABLE_CLIENT_TIMEOUT, "" + 300);
- }
+ cacheFactory.set(DURABLE_CLIENT_ID, "DurableClientId_" + clientNum);
+ cacheFactory.set(DURABLE_CLIENT_TIMEOUT, "" + 30000);
- ccf.addPoolServer(host.getHostName(), port);
- cache = (GemFireCacheImpl) ccf.create();
+ cacheFactory.addPoolServer(hostName, port);
+ clientCache = cacheFactory.create();
- ClientRegionFactory<String, String> crf =
- cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
- crf.setConcurrencyChecksEnabled(false);
+ ClientRegionFactory<String, String> regionFactory = clientCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
+ regionFactory.setConcurrencyChecksEnabled(false);
- crf.addCacheListener(new CacheListenerAdapter<String, String>() {
- public void afterInvalidate(EntryEvent<String, String> event) {
- cache.getLoggerI18n()
- .fine("Invalidate Event: " + event.getKey() + ", " + event.getNewValue());
+ regionFactory.addCacheListener(new CacheListenerAdapter<String, String>() {
+ @Override
+ public void afterInvalidate(final EntryEvent<String, String> event) {
numOfInvalidates++;
}
- public void afterCreate(EntryEvent<String, String> event) {
- if (((String) event.getKey()).equals("last_key")) {
+ @Override
+ public void afterCreate(final EntryEvent<String, String> event) {
+ if ("last_key".equals(event.getKey())) {
lastKeyReceived = true;
}
- cache.getLoggerI18n().fine("Create Event: " + event.getKey() + ", " + event.getNewValue());
numOfCreates++;
}
- public void afterUpdate(EntryEvent<String, String> event) {
- cache.getLoggerI18n().fine("Update Event: " + event.getKey() + ", " + event.getNewValue());
+ @Override
+ public void afterUpdate(final EntryEvent<String, String> event) {
numOfUpdates++;
}
});
- Region<String, String> r = crf.create(REGION_NAME);
+ Region<String, String> region = regionFactory.create(REGION_NAME);
if (subscriptionEnabled) {
- r.registerInterest("ALL_KEYS", true);
- cache.readyForEvents();
+ region.registerInterest("ALL_KEYS", true);
+ clientCache.readyForEvents();
}
}
- private static void doPuts() throws Exception {
- Cache cache = GemFireCacheImpl.getInstance();
- final Region<String, String> r = cache.getRegion(Region.SEPARATOR + REGION_NAME);
- Thread t1 = new Thread(new Runnable() {
- public void run() {
- for (int i = 0; i < 500; i++) {
- r.put("T1_KEY_" + i, "VALUE_" + i);
- }
+ /**
+ * Invoked in serverVM
+ */
+ private void doPuts() throws Exception {
+ Cache cache = this.managementTestRule.getCache();
+ Region<String, String> region = cache.getRegion(Region.SEPARATOR + REGION_NAME);
+
+ Thread thread1 = new Thread(() -> {
+ for (int i = 0; i < NUMBER_PUTS; i++) {
+ region.put("T1_KEY_" + i, "VALUE_" + i);
}
});
- Thread t2 = new Thread(new Runnable() {
- public void run() {
- for (int i = 0; i < 500; i++) {
- r.put("T2_KEY_" + i, "VALUE_" + i);
- }
+ Thread thread2 = new Thread(() -> {
+ for (int i = 0; i < NUMBER_PUTS; i++) {
+ region.put("T2_KEY_" + i, "VALUE_" + i);
}
});
- Thread t3 = new Thread(new Runnable() {
- public void run() {
- for (int i = 0; i < 500; i++) {
- r.put("T3_KEY_" + i, "VALUE_" + i);
- }
+ Thread thread3 = new Thread(() -> {
+ for (int i = 0; i < NUMBER_PUTS; i++) {
+ region.put("T3_KEY_" + i, "VALUE_" + i);
}
});
- t1.start();
- t2.start();
- t3.start();
+ thread1.start();
+ thread2.start();
+ thread3.start();
- t1.join();
- t2.join();
- t3.join();
+ thread1.join();
+ thread2.join();
+ thread3.join();
}
- private static void resumePuts() {
- Cache cache = GemFireCacheImpl.getInstance();
- Region<String, String> r = cache.getRegion(Region.SEPARATOR + REGION_NAME);
- for (int i = 0; i < 100; i++) {
- r.put("NEWKEY_" + i, "NEWVALUE_" + i);
+ /**
+ * Invoked in serverVM
+ */
+ private void resumePuts() {
+ Cache cache = this.managementTestRule.getCache();
+ Region<String, String> region = cache.getRegion(Region.SEPARATOR + REGION_NAME);
+
+ for (int i = 0; i < NUMBER_PUTS; i++) {
+ region.put("NEWKEY_" + i, "NEWVALUE_" + i);
}
- r.put("last_key", "last_value");
+ region.put("last_key", "last_value");
}
- private static void waitForLastKey() {
- WaitCriterion wc = new WaitCriterion() {
- @Override
- public boolean done() {
- return lastKeyReceived;
- }
+ /**
+ * Invoked in managerVM
+ */
+ private void verifyClientStats(final DistributedMember serverMember, final int serverPort, final int numSubscriptions) throws Exception {
+ ManagementService service = this.managementTestRule.getManagementService();
+ CacheServerMXBean cacheServerMXBean = awaitCacheServerMXBean(serverMember, serverPort);
- @Override
- public String description() {
- return "Did not receive last key.";
- }
- };
- Wait.waitForCriterion(wc, 60 * 1000, 500, true);
- }
+ String[] clientIds = cacheServerMXBean.getClientIds();
+ assertThat(clientIds).hasSize(2);
- private static DistributedMember getMember() throws Exception {
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
- return cache.getDistributedSystem().getDistributedMember();
- }
+ ClientHealthStatus[] clientStatuses = cacheServerMXBean.showAllClientStats();
- private static void verifyClientStats(DistributedMember serverMember, int serverPort,
- int numSubscriptions) {
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
- try {
- ManagementService service = ManagementService.getExistingManagementService(cache);
- CacheServerMXBean bean = MBeanUtil.getCacheServerMbeanProxy(serverMember, serverPort);
-
- String[] clientIds = bean.getClientIds();
- assertTrue(clientIds.length == 2);
- System.out.println(
- "<ExpectedString> ClientId-1 of the Server is " + clientIds[0] + "</ExpectedString> ");
- System.out.println(
- "<ExpectedString> ClientId-2 of the Server is " + clientIds[1] + "</ExpectedString> ");
-
- ClientHealthStatus[] clientStatuses = bean.showAllClientStats();
-
- ClientHealthStatus clientStatus1 = bean.showClientStats(clientIds[0]);
- ClientHealthStatus clientStatus2 = bean.showClientStats(clientIds[1]);
- assertNotNull(clientStatus1);
- assertNotNull(clientStatus2);
- System.out.println("<ExpectedString> ClientStats-1 of the Server is " + clientStatus1
- + "</ExpectedString> ");
- System.out.println("<ExpectedString> ClientStats-2 of the Server is " + clientStatus2
- + "</ExpectedString> ");
-
- System.out
- .println("<ExpectedString> clientStatuses " + clientStatuses + "</ExpectedString> ");
- assertNotNull(clientStatuses);
-
- assertTrue(clientStatuses.length == 2);
- for (ClientHealthStatus status : clientStatuses) {
- System.out.println(
- "<ExpectedString> ClientStats of the Server is " + status + "</ExpectedString> ");
- }
+ ClientHealthStatus clientStatus1 = cacheServerMXBean.showClientStats(clientIds[0]);
+ ClientHealthStatus clientStatus2 = cacheServerMXBean.showClientStats(clientIds[1]);
+ assertThat(clientStatus1).isNotNull();
+ assertThat(clientStatus2).isNotNull();
- DistributedSystemMXBean dsBean = service.getDistributedSystemMXBean();
- assertEquals(2, dsBean.getNumClients());
- assertEquals(numSubscriptions, dsBean.getNumSubscriptions());
+ assertThat(clientStatuses).isNotNull().hasSize(2);
- } catch (Exception e) {
- fail("Error while verifying cache server from remote member", e);
- }
+ DistributedSystemMXBean dsBean = service.getDistributedSystemMXBean();
+ assertThat(dsBean.getNumClients()).isEqualTo(2);
+ assertThat(dsBean.getNumSubscriptions()).isEqualTo(numSubscriptions);
}
- private static void put() {
- Cache cache = GemFireCacheImpl.getInstance();
- Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
- assertNotNull(r1);
-
- r1.put(k1, client_k1);
- assertEquals(r1.getEntry(k1).getValue(), client_k1);
- r1.put(k2, client_k2);
- assertEquals(r1.getEntry(k2).getValue(), client_k2);
- try {
- Thread.sleep(10000);
- } catch (Exception e) {
- // sleep
- }
- r1.clear();
-
- r1.put(k1, client_k1);
- assertEquals(r1.getEntry(k1).getValue(), client_k1);
- r1.put(k2, client_k2);
- assertEquals(r1.getEntry(k2).getValue(), client_k2);
- r1.clear();
- try {
- Thread.sleep(10000);
- } catch (Exception e) {
- // sleep
- }
+ /**
+ * Invoked in client1VM and client2VM
+ */
+ private void put() throws Exception {
+ Cache cache = (Cache)clientCache;
+ Region<String, String> region = cache.getRegion(Region.SEPARATOR + REGION_NAME);
+
+ region.put(KEY1, VALUE1);
+ assertThat(region.getEntry(KEY1).getValue()).isEqualTo(VALUE1);
+
+ region.put(KEY2, VALUE2);
+ assertThat(region.getEntry(KEY2).getValue()).isEqualTo(VALUE2);
+
+ region.clear();
+
+ region.put(KEY1, VALUE1);
+ assertThat(region.getEntry(KEY1).getValue()).isEqualTo(VALUE1);
+
+ region.put(KEY2, VALUE2);
+ assertThat(region.getEntry(KEY2).getValue()).isEqualTo(VALUE2);
+
+ region.clear();
}
- private static void verifyStats(int serverPort) throws Exception {
- Cache cache = GemFireCacheImpl.getInstance();
- ManagementService service = ManagementService.getExistingManagementService(cache);
+ /**
+ * Invoked in serverVM
+ */
+ private void verifyStats(final int serverPort) throws Exception {
+ ManagementService service = this.managementTestRule.getManagementService();
CacheServerMXBean serverBean = service.getLocalCacheServerMXBean(serverPort);
- CacheClientNotifier ccn = CacheClientNotifier.getInstance();
- CacheClientProxy ccp = ccn.getClientProxies().iterator().next();
- cache.getLoggerI18n().info(LocalizedStrings.DEBUG, "getQueueSize() " + ccp.getQueueSize());
- cache.getLoggerI18n().info(LocalizedStrings.DEBUG,
- "getQueueSizeStat() " + ccp.getQueueSizeStat());
- cache.getLoggerI18n().info(LocalizedStrings.DEBUG,
- "getEventsEnqued() " + ccp.getHARegionQueue().getStatistics().getEventsEnqued());
- cache.getLoggerI18n().info(LocalizedStrings.DEBUG,
- "getEventsDispatched() " + ccp.getHARegionQueue().getStatistics().getEventsDispatched());
- cache.getLoggerI18n().info(LocalizedStrings.DEBUG,
- "getEventsRemoved() " + ccp.getHARegionQueue().getStatistics().getEventsRemoved());
- cache.getLoggerI18n().info(LocalizedStrings.DEBUG,
- "getNumVoidRemovals() " + ccp.getHARegionQueue().getStatistics().getNumVoidRemovals());
- assertEquals(ccp.getQueueSize(), ccp.getQueueSizeStat());
+
+ CacheClientNotifier clientNotifier = CacheClientNotifier.getInstance();
+ CacheClientProxy clientProxy = clientNotifier.getClientProxies().iterator().next();
+ assertThat(clientProxy.getQueueSizeStat()).isEqualTo(clientProxy.getQueueSize());
+
ClientQueueDetail queueDetails = serverBean.showClientQueueDetails()[0];
- assertEquals(queueDetails.getQueueSize(), ccp.getQueueSizeStat());
+ assertThat(clientProxy.getQueueSizeStat()).isEqualTo((int)queueDetails.getQueueSize());
+ }
+
+ private CacheServerMXBean awaitCacheServerMXBean(final DistributedMember serverMember, final int port) {
+ SystemManagementService service = this.managementTestRule.getSystemManagementService();
+ ObjectName objectName = service.getCacheServerMBeanName(port, serverMember);
+
+ await().until(() -> assertThat(service.getMBeanProxy(objectName, CacheServerMXBean.class)).isNotNull());
+
+ return service.getMBeanProxy(objectName, CacheServerMXBean.class);
+ }
+
+ private ConditionFactory await() {
+ return Awaitility.await().atMost(2, MINUTES);
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/c3586a96/geode-core/src/test/java/org/apache/geode/management/CompositeTypeTestDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/management/CompositeTypeTestDUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/CompositeTypeTestDUnitTest.java
index 9c33003..3176bda 100644
--- a/geode-core/src/test/java/org/apache/geode/management/CompositeTypeTestDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/CompositeTypeTestDUnitTest.java
@@ -14,163 +14,86 @@
*/
package org.apache.geode.management;
-import org.junit.experimental.categories.Category;
-import org.junit.Test;
+import static java.util.concurrent.TimeUnit.*;
+import static org.assertj.core.api.Assertions.*;
-import static org.junit.Assert.*;
+import java.io.Serializable;
-import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
-import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
-import org.apache.geode.test.junit.categories.DistributedTest;
-import org.apache.geode.test.junit.categories.FlakyTest;
-
-import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import com.jayway.awaitility.Awaitility;
+import com.jayway.awaitility.core.ConditionFactory;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
import org.apache.geode.management.internal.MBeanJMXAdapter;
-import org.apache.geode.management.internal.ManagementConstants;
import org.apache.geode.management.internal.SystemManagementService;
-import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
+import org.apache.geode.test.junit.categories.DistributedTest;
@Category(DistributedTest.class)
-public class CompositeTypeTestDUnitTest extends ManagementTestBase {
+@SuppressWarnings({ "serial", "unused" })
+public class CompositeTypeTestDUnitTest implements Serializable {
- public CompositeTypeTestDUnitTest() {
- super();
- // TODO Auto-generated constructor stub
- }
+ @Manager
+ private VM managerVM;
- /**
- *
- */
- private static final long serialVersionUID = 1L;
+ @Member
+ private VM memberVM;
- private static ObjectName objectName;
+ @Rule
+ public ManagementTestRule managementTestRule = ManagementTestRule.builder().start(true).build();
- @Category(FlakyTest.class) // GEODE-1492
@Test
- public void testCompositeTypeGetters() throws Exception {
+ public void testCompositeTypeGetters() throws Exception{
+ registerMBeanWithCompositeTypeGetters(this.memberVM);
- initManagement(false);
- String member = getMemberId(managedNode1);
- member = MBeanJMXAdapter.makeCompliantName(member);
-
- registerMBeanWithCompositeTypeGetters(managedNode1, member);
+ String memberName = MBeanJMXAdapter.makeCompliantName(getMemberId(this.memberVM));
+ verifyMBeanWithCompositeTypeGetters(this.managerVM, memberName);
+ }
+ private void registerMBeanWithCompositeTypeGetters(final VM memberVM) throws Exception {
+ memberVM.invoke("registerMBeanWithCompositeTypeGetters", () -> {
+ SystemManagementService service = this.managementTestRule.getSystemManagementService();
- checkMBeanWithCompositeTypeGetters(managingNode, member);
+ ObjectName objectName = new ObjectName("GemFire:service=custom,type=composite");
+ CompositeTestMXBean compositeTestMXBean = new CompositeTestMBean();
+ objectName = service.registerMBean(compositeTestMXBean, objectName);
+ service.federate(objectName, CompositeTestMXBean.class, false);
+ });
}
+ private void verifyMBeanWithCompositeTypeGetters(final VM managerVM, final String memberId) throws Exception {
+ managerVM.invoke("verifyMBeanWithCompositeTypeGetters", () -> {
+ SystemManagementService service = this.managementTestRule.getSystemManagementService();
+ ObjectName objectName = new ObjectName("GemFire:service=custom,type=composite,member=" + memberId);
- /**
- * Creates a Local region
- *
- * @param vm reference to VM
- */
- protected void registerMBeanWithCompositeTypeGetters(VM vm, final String memberID)
- throws Exception {
- SerializableRunnable regMBean =
- new SerializableRunnable("Register CustomMBean with composite Type") {
- public void run() {
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
- SystemManagementService service = (SystemManagementService) getManagementService();
-
- try {
- ObjectName objectName = new ObjectName("GemFire:service=custom,type=composite");
- CompositeTestMXBean mbean = new CompositeTestMBean();
- objectName = service.registerMBean(mbean, objectName);
- service.federate(objectName, CompositeTestMXBean.class, false);
- } catch (MalformedObjectNameException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (NullPointerException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
-
-
- }
- };
- vm.invoke(regMBean);
- }
+ await().until(() -> service.getMBeanInstance(objectName, CompositeTestMXBean.class) != null);
+
+ CompositeTestMXBean compositeTestMXBean = service.getMBeanInstance(objectName, CompositeTestMXBean.class);
+ assertThat(compositeTestMXBean).isNotNull();
+ CompositeStats listCompositeStatsData = compositeTestMXBean.listCompositeStats();
+ assertThat(listCompositeStatsData).isNotNull();
- /**
- * Creates a Local region
- *
- * @param vm reference to VM
- */
- protected void checkMBeanWithCompositeTypeGetters(VM vm, final String memberID) throws Exception {
- SerializableRunnable checkMBean =
- new SerializableRunnable("Check CustomMBean with composite Type") {
- public void run() {
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
- final SystemManagementService service =
- (SystemManagementService) getManagementService();
-
- try {
- final ObjectName objectName =
- new ObjectName("GemFire:service=custom,type=composite,member=" + memberID);
-
- Wait.waitForCriterion(new WaitCriterion() {
- public String description() {
- return "Waiting for Composite Type MBean";
- }
-
- public boolean done() {
- CompositeTestMXBean bean =
- service.getMBeanInstance(objectName, CompositeTestMXBean.class);
- boolean done = (bean != null);
- return done;
- }
-
- }, ManagementConstants.REFRESH_TIME * 4, 500, true);
-
-
- CompositeTestMXBean bean =
- service.getMBeanInstance(objectName, CompositeTestMXBean.class);
-
- CompositeStats listData = bean.listCompositeStats();
-
- System.out.println("connectionStatsType = " + listData.getConnectionStatsType());
- System.out.println("connectionsOpened = " + listData.getConnectionsOpened());
- System.out.println("connectionsClosed = " + listData.getConnectionsClosed());
- System.out.println("connectionsAttempted = " + listData.getConnectionsAttempted());
- System.out.println("connectionsFailed = " + listData.getConnectionsFailed());
-
- CompositeStats getsData = bean.getCompositeStats();
- System.out.println("connectionStatsType = " + getsData.getConnectionStatsType());
- System.out.println("connectionsOpened = " + getsData.getConnectionsOpened());
- System.out.println("connectionsClosed = " + getsData.getConnectionsClosed());
- System.out.println("connectionsAttempted = " + getsData.getConnectionsAttempted());
- System.out.println("connectionsFailed = " + getsData.getConnectionsFailed());
-
- CompositeStats[] arrayData = bean.getCompositeArray();
- Integer[] intArrayData = bean.getIntegerArray();
- Thread.sleep(2 * 60 * 1000);
- } catch (MalformedObjectNameException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (NullPointerException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
-
-
- }
- };
- vm.invoke(checkMBean);
+ CompositeStats getCompositeStatsData = compositeTestMXBean.getCompositeStats();
+ assertThat(getCompositeStatsData).isNotNull();
+
+ CompositeStats[] getCompositeArrayData = compositeTestMXBean.getCompositeArray();
+ assertThat(getCompositeArrayData).isNotNull().isNotEmpty();
+
+ Integer[] getIntegerArrayData = compositeTestMXBean.getIntegerArray();
+ assertThat(getIntegerArrayData).isNotNull().isNotEmpty();
+ });
}
+ private String getMemberId(final VM memberVM) {
+ return memberVM.invoke("getMemberId", () -> this.managementTestRule.getDistributedMember().getId());
+ }
+ private ConditionFactory await() {
+ return Awaitility.await().atMost(2, MINUTES);
+ }
}
http://git-wip-us.apache.org/repos/asf/geode/blob/c3586a96/geode-core/src/test/java/org/apache/geode/management/DLockManagementDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/management/DLockManagementDUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/DLockManagementDUnitTest.java
index 86501c3..9c8e5c9 100644
--- a/geode-core/src/test/java/org/apache/geode/management/DLockManagementDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/DLockManagementDUnitTest.java
@@ -14,452 +14,272 @@
*/
package org.apache.geode.management;
-import org.junit.experimental.categories.Category;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
-import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
-import org.apache.geode.test.junit.categories.DistributedTest;
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.geode.internal.process.ProcessUtils.*;
+import static org.assertj.core.api.Assertions.*;
+import java.io.Serializable;
import java.util.Map;
import java.util.Set;
import javax.management.ObjectName;
+import com.jayway.awaitility.Awaitility;
+import com.jayway.awaitility.core.ConditionFactory;
+import org.junit.Rule;
+import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.distributed.DistributedLockService;
import org.apache.geode.distributed.DistributedMember;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.locks.DLockService;
-import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.management.internal.MBeanJMXAdapter;
import org.apache.geode.management.internal.SystemManagementService;
-import org.apache.geode.test.dunit.Assert;
-import org.apache.geode.test.dunit.LogWriterUtils;
-import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
-import org.apache.geode.test.junit.categories.FlakyTest;
+import org.apache.geode.test.junit.categories.DistributedTest;
@Category(DistributedTest.class)
-public class DLockManagementDUnitTest extends ManagementTestBase {
+@SuppressWarnings({ "serial", "unused" })
+public class DLockManagementDUnitTest implements Serializable {
- private static final long serialVersionUID = 1L;
+ private static final int MAX_WAIT_MILLIS = 120 * 1000; // 2 MINUTES
- private static final String LOCK_SERVICE_NAME = "testLockService";
+ private static final String LOCK_SERVICE_NAME = DLockManagementDUnitTest.class.getSimpleName() + "_testLockService";
- // This must be bigger than the dunit ack-wait-threshold for the revoke
- // tests. The command line is setting the ack-wait-threshold to be
- // 60 seconds.
- private static final int MAX_WAIT = 70 * 1000;
+ @Manager
+ private VM managerVM;
- public DLockManagementDUnitTest() {
- super();
+ @Member
+ private VM[] memberVMs;
- }
+ @Rule
+ public ManagementTestRule managementTestRule = ManagementTestRule.builder().managersFirst(false).start(true).build();
- /**
- * Distributed Lock Service test
- *
- * @throws Exception
- */
- @Category(FlakyTest.class) // GEODE-173: eats exceptions, HeadlessGFSH, time sensitive,
- // waitForCriterions
@Test
- public void testDLockMBean() throws Throwable {
+ public void testLockServiceMXBean() throws Throwable {
+ createLockServiceGrantor(this.memberVMs[0]);
+ createLockService(this.memberVMs[1]);
+ createLockService(this.memberVMs[2]);
- initManagement(false);
-
- VM[] managedNodes = new VM[getManagedNodeList().size()];
- VM managingNode = getManagingNode();
-
- getManagedNodeList().toArray(managedNodes);
-
- createGrantorLockService(managedNodes[0]);
-
- createLockService(managedNodes[1]);
-
- createLockService(managedNodes[2]);
-
- for (VM vm : getManagedNodeList()) {
- verifyLockData(vm);
+ for (VM memberVM : this.memberVMs) {
+ verifyLockServiceMXBeanInMember(memberVM);
}
- verifyLockDataRemote(managingNode);
+ verifyLockServiceMXBeanInManager(this.managerVM);
- for (VM vm : getManagedNodeList()) {
- closeLockService(vm);
+ for (VM memberVM : this.memberVMs) {
+ closeLockService(memberVM);
}
}
- /**
- * Distributed Lock Service test
- *
- * @throws Exception
- */
- @Category(FlakyTest.class) // GEODE-553: waitForCriterion, eats exceptions, HeadlessGFSH
@Test
- public void testDLockAggregate() throws Throwable {
- initManagement(false);
- VM[] managedNodes = new VM[getManagedNodeList().size()];
- VM managingNode = getManagingNode();
-
- getManagedNodeList().toArray(managedNodes);
-
- createGrantorLockService(managedNodes[0]);
+ public void testDistributedLockServiceMXBean() throws Throwable {
+ createLockServiceGrantor(this.memberVMs[0]);
+ createLockService(this.memberVMs[1]);
+ createLockService(this.memberVMs[2]);
- createLockService(managedNodes[1]);
+ verifyDistributedLockServiceMXBean(this.managerVM, 3);
- createLockService(managedNodes[2]);
+ DistributedMember member = this.managementTestRule.getDistributedMember(this.memberVMs[2]);
+ verifyFetchOperations(this.managerVM, member);
- checkAggregate(managingNode, 3);
- DistributedMember member = getMember(managedNodes[2]);
- checkNavigation(managingNode, member);
+ createLockService(this.managerVM);
+ verifyDistributedLockServiceMXBean(this.managerVM, 4);
- createLockService(managingNode);
- checkAggregate(managingNode, 4);
-
-
- for (VM vm : getManagedNodeList()) {
- closeLockService(vm);
+ for (VM memberVM : this.memberVMs) {
+ closeLockService(memberVM);
}
- ensureProxyCleanup(managingNode);
- checkAggregate(managingNode, 1);
- closeLockService(managingNode);
- checkAggregate(managingNode, 0);
+ verifyProxyCleanupInManager(this.managerVM);
+ verifyDistributedLockServiceMXBean(this.managerVM, 1);
+ closeLockService(this.managerVM);
+ verifyDistributedLockServiceMXBean(this.managerVM, 0);
}
- public void ensureProxyCleanup(final VM vm) {
-
- SerializableRunnable ensureProxyCleanup = new SerializableRunnable("Ensure Proxy cleanup") {
- public void run() {
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
- Set<DistributedMember> otherMemberSet =
- cache.getDistributionManager().getOtherNormalDistributionManagerIds();
- final SystemManagementService service = (SystemManagementService) getManagementService();
-
-
- for (final DistributedMember member : otherMemberSet) {
- RegionMXBean bean = null;
- try {
-
- Wait.waitForCriterion(new WaitCriterion() {
-
- LockServiceMXBean bean = null;
-
- public String description() {
- return "Waiting for the proxy to get deleted at managing node";
- }
-
- public boolean done() {
- ObjectName objectName = service.getRegionMBeanName(member, LOCK_SERVICE_NAME);
- bean = service.getMBeanProxy(objectName, LockServiceMXBean.class);
- boolean done = (bean == null);
- return done;
- }
-
- }, MAX_WAIT, 500, true);
-
- } catch (Exception e) {
- throw new AssertionError("could not remove proxies in required time", e);
-
- }
- assertNull(bean);
-
- }
+ private void verifyProxyCleanupInManager(final VM managerVM) {
+ managerVM.invoke("verifyProxyCleanupInManager", () -> {
+ Set<DistributedMember> otherMembers = this.managementTestRule.getOtherNormalMembers();
+ SystemManagementService service = this.managementTestRule.getSystemManagementService();
+ for (final DistributedMember member : otherMembers) {
+ ObjectName objectName = service.getRegionMBeanName(member, LOCK_SERVICE_NAME);
+ await().until(() -> assertThat(service.getMBeanProxy(objectName, LockServiceMXBean.class)).isNull());
}
- };
- vm.invoke(ensureProxyCleanup);
+ });
}
- /**
- * Creates a grantor lock service
- *
- * @param vm
- */
- @SuppressWarnings("serial")
- protected void createGrantorLockService(final VM vm) {
- SerializableRunnable createGrantorLockService =
- new SerializableRunnable("Create Grantor LockService") {
- public void run() {
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
- assertNull(DistributedLockService.getServiceNamed(LOCK_SERVICE_NAME));
-
- DLockService service = (DLockService) DistributedLockService.create(LOCK_SERVICE_NAME,
- cache.getDistributedSystem());
+ private void createLockServiceGrantor(final VM memberVM) {
+ memberVM.invoke("createLockServiceGrantor", () -> {
+ assertThat(DistributedLockService.getServiceNamed(LOCK_SERVICE_NAME)).isNull();
- assertSame(service, DistributedLockService.getServiceNamed(LOCK_SERVICE_NAME));
+ DLockService lockService = (DLockService) DistributedLockService.create(LOCK_SERVICE_NAME, this.managementTestRule.getCache().getDistributedSystem());
+ DistributedMember grantor = lockService.getLockGrantorId().getLockGrantorMember();
+ assertThat(grantor).isNotNull();
- InternalDistributedMember grantor = service.getLockGrantorId().getLockGrantorMember();
+ LockServiceMXBean lockServiceMXBean = awaitLockServiceMXBean(LOCK_SERVICE_NAME);
- assertNotNull(grantor);
+ assertThat(lockServiceMXBean).isNotNull();
+ assertThat(lockServiceMXBean.isDistributed()).isTrue();
+ assertThat(lockServiceMXBean.getName()).isEqualTo(LOCK_SERVICE_NAME);
+ assertThat(lockServiceMXBean.isLockGrantor()).isTrue();
+ assertThat(lockServiceMXBean.fetchGrantorMember()).isEqualTo(this.managementTestRule.getDistributedMember().getId());
+ });
+ }
- LogWriterUtils.getLogWriter().info("In identifyLockGrantor - grantor is " + grantor);
+ private void createLockService(final VM anyVM) {
+ anyVM.invoke("createLockService", () -> {
+ assertThat(DistributedLockService.getServiceNamed(LOCK_SERVICE_NAME)).isNull();
+ DistributedLockService.create(LOCK_SERVICE_NAME, this.managementTestRule.getCache().getDistributedSystem());
+ LockServiceMXBean lockServiceMXBean = awaitLockServiceMXBean(LOCK_SERVICE_NAME);
- ManagementService mgmtService = getManagementService();
+ assertThat(lockServiceMXBean).isNotNull();
+ assertThat(lockServiceMXBean.isDistributed()).isTrue();
+ assertThat(lockServiceMXBean.isLockGrantor()).isFalse();
+ });
+ }
- LockServiceMXBean bean = mgmtService.getLocalLockServiceMBean(LOCK_SERVICE_NAME);
+ private void closeLockService(final VM anyVM) {
+ anyVM.invoke("closeLockService", () -> {
+ assertThat(DistributedLockService.getServiceNamed(LOCK_SERVICE_NAME)).isNotNull();
+ DistributedLockService.destroy(LOCK_SERVICE_NAME);
- assertNotNull(bean);
+ awaitLockServiceMXBeanIsNull(LOCK_SERVICE_NAME);
- assertTrue(bean.isDistributed());
+ ManagementService service = this.managementTestRule.getManagementService();
+ LockServiceMXBean lockServiceMXBean = service.getLocalLockServiceMBean(LOCK_SERVICE_NAME);
+ assertThat(lockServiceMXBean).isNull();
+ });
+ }
- assertEquals(bean.getName(), LOCK_SERVICE_NAME);
+ private void verifyLockServiceMXBeanInMember(final VM memberVM) {
+ memberVM.invoke("verifyLockServiceMXBeanInManager", () -> {
+ DistributedLockService lockService = DistributedLockService.getServiceNamed(LOCK_SERVICE_NAME);
+ lockService.lock("lockObject_" + identifyPid(), MAX_WAIT_MILLIS, -1);
- assertTrue(bean.isLockGrantor());
+ ManagementService service = this.managementTestRule.getManagementService();
+ LockServiceMXBean lockServiceMXBean = service.getLocalLockServiceMBean(LOCK_SERVICE_NAME);
+ assertThat(lockServiceMXBean).isNotNull();
- assertEquals(cache.getDistributedSystem().getMemberId(), bean.fetchGrantorMember());
+ String[] listHeldLock = lockServiceMXBean.listHeldLocks();
+ assertThat(listHeldLock).hasSize(1);
- }
- };
- vm.invoke(createGrantorLockService);
+ Map<String, String> lockThreadMap = lockServiceMXBean.listThreadsHoldingLock();
+ assertThat(lockThreadMap).hasSize(1);
+ });
}
/**
- * Creates a named lock service
- *
- * @param vm
+ * Verify lock data from remote Managing node
*/
- @SuppressWarnings("serial")
- protected void createLockService(final VM vm) {
- SerializableRunnable createLockService = new SerializableRunnable("Create LockService") {
- public void run() {
- assertNull(DistributedLockService.getServiceNamed(LOCK_SERVICE_NAME));
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
- DistributedLockService service =
- DistributedLockService.create(LOCK_SERVICE_NAME, cache.getDistributedSystem());
+ private void verifyLockServiceMXBeanInManager(final VM managerVM) throws Exception {
+ managerVM.invoke("verifyLockServiceMXBeanInManager", () -> {
+ Set<DistributedMember> otherMembers = this.managementTestRule.getOtherNormalMembers();
- assertSame(service, DistributedLockService.getServiceNamed(LOCK_SERVICE_NAME));
+ for (DistributedMember member : otherMembers) {
+ LockServiceMXBean lockServiceMXBean = awaitLockServiceMXBeanProxy(member, LOCK_SERVICE_NAME);
+ assertThat(lockServiceMXBean).isNotNull();
+ String[] listHeldLock = lockServiceMXBean.listHeldLocks();
+ assertThat(listHeldLock).hasSize(1);
+ Map<String, String> lockThreadMap = lockServiceMXBean.listThreadsHoldingLock();
+ assertThat(lockThreadMap).hasSize(1);
+ }
+ });
+ }
- ManagementService mgmtService = getManagementService();
-
- LockServiceMXBean bean = mgmtService.getLocalLockServiceMBean(LOCK_SERVICE_NAME);
-
- assertNotNull(bean);
+ private void verifyFetchOperations(final VM memberVM, final DistributedMember member) {
+ memberVM.invoke("verifyFetchOperations", () -> {
+ ManagementService service = this.managementTestRule.getManagementService();
- assertTrue(bean.isDistributed());
+ DistributedSystemMXBean distributedSystemMXBean = awaitDistributedSystemMXBean();
+ ObjectName distributedLockServiceMXBeanName = MBeanJMXAdapter.getDistributedLockServiceName(LOCK_SERVICE_NAME);
+ assertThat(distributedSystemMXBean.fetchDistributedLockServiceObjectName(LOCK_SERVICE_NAME)).isEqualTo(distributedLockServiceMXBeanName);
- assertFalse(bean.isLockGrantor());
- }
- };
- vm.invoke(createLockService);
+ ObjectName lockServiceMXBeanName = MBeanJMXAdapter.getLockServiceMBeanName(member.getId(), LOCK_SERVICE_NAME);
+ assertThat(distributedSystemMXBean.fetchLockServiceObjectName(member.getId(), LOCK_SERVICE_NAME)).isEqualTo(lockServiceMXBeanName);
+ });
}
/**
- * Closes a named lock service
- *
- * @param vm
+ * Verify Aggregate MBean
*/
- @SuppressWarnings("serial")
- protected void closeLockService(final VM vm) {
- SerializableRunnable closeLockService = new SerializableRunnable("Close LockService") {
- public void run() {
-
- DistributedLockService service = DistributedLockService.getServiceNamed(LOCK_SERVICE_NAME);
-
- DistributedLockService.destroy(LOCK_SERVICE_NAME);
+ private void verifyDistributedLockServiceMXBean(final VM managerVM, final int memberCount) {
+ managerVM.invoke("verifyDistributedLockServiceMXBean", () -> {
+ ManagementService service = this.managementTestRule.getManagementService();
- ManagementService mgmtService = getManagementService();
-
- LockServiceMXBean bean = null;
- try {
+ if (memberCount == 0) {
+ await().until(() -> assertThat(service.getDistributedLockServiceMXBean(LOCK_SERVICE_NAME)).isNull());
+ return;
+ }
- bean = mgmtService.getLocalLockServiceMBean(LOCK_SERVICE_NAME);
+ DistributedLockServiceMXBean distributedLockServiceMXBean = awaitDistributedLockServiceMXBean(LOCK_SERVICE_NAME, memberCount);
+ assertThat(distributedLockServiceMXBean).isNotNull();
+ assertThat(distributedLockServiceMXBean.getName()).isEqualTo(LOCK_SERVICE_NAME);
+ });
+ }
- } catch (ManagementException mgs) {
+ private DistributedSystemMXBean awaitDistributedSystemMXBean() {
+ ManagementService service = this.managementTestRule.getManagementService();
- }
- assertNull(bean);
+ await().until(() -> assertThat(service.getDistributedSystemMXBean()).isNotNull());
- }
- };
- vm.invoke(closeLockService);
+ return service.getDistributedSystemMXBean();
}
/**
- * Lock data related verifications
- *
- * @param vm
+ * Await and return a DistributedRegionMXBean proxy with specified member
+ * count.
*/
- @SuppressWarnings("serial")
- protected void verifyLockData(final VM vm) {
- SerializableRunnable verifyLockData = new SerializableRunnable("Verify LockService") {
- public void run() {
-
- DistributedLockService service = DistributedLockService.getServiceNamed(LOCK_SERVICE_NAME);
-
- final String LOCK_OBJECT = "lockObject_" + vm.getPid();
-
- Wait.waitForCriterion(new WaitCriterion() {
- DistributedLockService service = null;
-
- public String description() {
- return "Waiting for the lock service to be initialised";
- }
-
- public boolean done() {
- DistributedLockService service =
- DistributedLockService.getServiceNamed(LOCK_SERVICE_NAME);
- boolean done = service != null;
- return done;
- }
-
- }, MAX_WAIT, 500, true);
-
- service.lock(LOCK_OBJECT, 1000, -1);
-
+ private DistributedLockServiceMXBean awaitDistributedLockServiceMXBean(final String lockServiceName, final int memberCount) {
+ ManagementService service = this.managementTestRule.getManagementService();
- ManagementService mgmtService = getManagementService();
+ await().until(() -> {
+ assertThat(service.getDistributedLockServiceMXBean(lockServiceName)).isNotNull();
+ assertThat(service.getDistributedLockServiceMXBean(lockServiceName).getMemberCount()).isEqualTo(memberCount);
+ });
- LockServiceMXBean bean = null;
- try {
-
- bean = mgmtService.getLocalLockServiceMBean(LOCK_SERVICE_NAME);
-
- } catch (ManagementException mgs) {
-
- }
- assertNotNull(bean);
- String[] listHeldLock = bean.listHeldLocks();
- assertEquals(listHeldLock.length, 1);
- LogWriterUtils.getLogWriter().info("List Of Lock Object is " + listHeldLock[0]);
- Map<String, String> lockThreadMap = bean.listThreadsHoldingLock();
- assertEquals(lockThreadMap.size(), 1);
- LogWriterUtils.getLogWriter().info("List Of Lock Thread is " + lockThreadMap.toString());
- }
- };
- vm.invoke(verifyLockData);
+ return service.getDistributedLockServiceMXBean(lockServiceName);
}
/**
- * Verify lock data from remote Managing node
- *
- * @param vm
+ * Await and return a LockServiceMXBean proxy for a specific member and
+ * lockServiceName.
*/
- @SuppressWarnings("serial")
- protected void verifyLockDataRemote(final VM vm) {
- SerializableRunnable verifyLockDataRemote =
- new SerializableRunnable("Verify LockService Remote") {
- public void run() {
-
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
- Set<DistributedMember> otherMemberSet =
- cache.getDistributionManager().getOtherNormalDistributionManagerIds();
-
- for (DistributedMember member : otherMemberSet) {
- LockServiceMXBean bean = null;
- try {
- bean = MBeanUtil.getLockServiceMbeanProxy(member, LOCK_SERVICE_NAME);
- } catch (Exception e) {
- InternalDistributedSystem.getLoggerI18n()
- .fine("Undesired Result , LockServiceMBean Should not be null", e);
-
- }
- assertNotNull(bean);
- String[] listHeldLock = bean.listHeldLocks();
- assertEquals(listHeldLock.length, 1);
- LogWriterUtils.getLogWriter().info("List Of Lock Object is " + listHeldLock[0]);
- Map<String, String> lockThreadMap = bean.listThreadsHoldingLock();
- assertEquals(lockThreadMap.size(), 1);
- LogWriterUtils.getLogWriter()
- .info("List Of Lock Thread is " + lockThreadMap.toString());
- }
-
- }
- };
- vm.invoke(verifyLockDataRemote);
- }
+ private LockServiceMXBean awaitLockServiceMXBeanProxy(final DistributedMember member, final String lockServiceName) {
+ SystemManagementService service = this.managementTestRule.getSystemManagementService();
+ ObjectName lockServiceMXBeanName = service.getLockServiceMBeanName(member, lockServiceName);
- protected void checkNavigation(final VM vm, final DistributedMember lockServiceMember) {
- SerializableRunnable checkNavigation = new SerializableRunnable("Check Navigation") {
- public void run() {
-
- final ManagementService service = getManagementService();
-
- DistributedSystemMXBean disMBean = service.getDistributedSystemMXBean();
- try {
- ObjectName expected = MBeanJMXAdapter.getDistributedLockServiceName(LOCK_SERVICE_NAME);
- ObjectName actual = disMBean.fetchDistributedLockServiceObjectName(LOCK_SERVICE_NAME);
- assertEquals(expected, actual);
- } catch (Exception e) {
- throw new AssertionError("Lock Service Navigation Failed ", e);
- }
-
- try {
- ObjectName expected =
- MBeanJMXAdapter.getLockServiceMBeanName(lockServiceMember.getId(), LOCK_SERVICE_NAME);
- ObjectName actual =
- disMBean.fetchLockServiceObjectName(lockServiceMember.getId(), LOCK_SERVICE_NAME);
- assertEquals(expected, actual);
- } catch (Exception e) {
- throw new AssertionError("Lock Service Navigation Failed ", e);
- }
+ await().until(() -> assertThat(service.getMBeanProxy(lockServiceMXBeanName, LockServiceMXBean.class)).isNotNull());
- }
- };
- vm.invoke(checkNavigation);
+ return service.getMBeanProxy(lockServiceMXBeanName, LockServiceMXBean.class);
}
/**
- * Verify Aggregate MBean
- *
- * @param vm
+ * Await creation of local LockServiceMXBean for specified lockServiceName.
*/
- @SuppressWarnings("serial")
- protected void checkAggregate(final VM vm, final int expectedMembers) {
- SerializableRunnable checkAggregate = new SerializableRunnable("Verify Aggregate MBean") {
- public void run() {
-
- final ManagementService service = getManagementService();
- if (expectedMembers == 0) {
- try {
- Wait.waitForCriterion(new WaitCriterion() {
-
- DistributedLockServiceMXBean bean = null;
+ private LockServiceMXBean awaitLockServiceMXBean(final String lockServiceName) {
+ SystemManagementService service = this.managementTestRule.getSystemManagementService();
- public String description() {
- return "Waiting for the proxy to get deleted at managing node";
- }
+ await().until(() -> assertThat(service.getLocalLockServiceMBean(lockServiceName)).isNotNull());
- public boolean done() {
- bean = service.getDistributedLockServiceMXBean(LOCK_SERVICE_NAME);
-
- boolean done = (bean == null);
- return done;
- }
-
- }, MAX_WAIT, 500, true);
-
- } catch (Exception e) {
- throw new AssertionError("could not remove Aggregate Bean in required time", e);
-
- }
- return;
- }
+ return service.getLocalLockServiceMBean(lockServiceName);
+ }
- DistributedLockServiceMXBean bean = null;
- try {
- bean = MBeanUtil.getDistributedLockMbean(LOCK_SERVICE_NAME, expectedMembers);
- } catch (Exception e) {
- InternalDistributedSystem.getLoggerI18n()
- .fine("Undesired Result , LockServiceMBean Should not be null", e);
+ /**
+ * Await destruction of local LockServiceMXBean for specified
+ * lockServiceName.
+ */
+ private void awaitLockServiceMXBeanIsNull(final String lockServiceName) {
+ SystemManagementService service = this.managementTestRule.getSystemManagementService();
- }
- assertNotNull(bean);
- assertEquals(bean.getName(), LOCK_SERVICE_NAME);
+ await().until(() -> assertThat(service.getLocalLockServiceMBean(lockServiceName)).isNull());
+ }
- }
- };
- vm.invoke(checkAggregate);
+ private ConditionFactory await() {
+ return Awaitility.await().atMost(MAX_WAIT_MILLIS, MILLISECONDS);
}
}
+