You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/05/13 18:37:27 UTC
[17/32] incubator-geode git commit: GEODE-1376: Cleaned up server
port to be '0'. Cleaned up some legacy code and replaced with cleaner Lambda
invocation. Replaced waitOnCriterion with Awaitility.
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c33efb60/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorLoadBalancingDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorLoadBalancingDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorLoadBalancingDUnitTest.java
index 9ef87d2..176fbea 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorLoadBalancingDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorLoadBalancingDUnitTest.java
@@ -16,17 +16,6 @@
*/
package com.gemstone.gemfire.cache.client.internal;
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.junit.Assert;
-
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.client.PoolManager;
import com.gemstone.gemfire.cache.client.internal.locator.ClientConnectionRequest;
@@ -50,24 +39,31 @@ import com.gemstone.gemfire.internal.logging.LocalLogWriter;
import com.gemstone.gemfire.test.dunit.Host;
import com.gemstone.gemfire.test.dunit.LogWriterUtils;
import com.gemstone.gemfire.test.dunit.NetworkUtils;
-import com.gemstone.gemfire.test.dunit.SerializableRunnable;
-import com.gemstone.gemfire.test.dunit.SerializableRunnableIF;
import com.gemstone.gemfire.test.dunit.VM;
-import com.gemstone.gemfire.test.dunit.Wait;
-import com.gemstone.gemfire.test.dunit.WaitCriterion;
+import com.jayway.awaitility.Awaitility;
+import org.junit.Assert;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
/**
*
*/
public class LocatorLoadBalancingDUnitTest extends LocatorTestBase {
-
+
/**
* The number of connections that we can be off by in the balancing tests
* We need this little fudge factor, because the locator can receive an update
* from the bridge server after it has made incremented its counter for a client
* connection, but the client hasn't connected yet. This wipes out the estimation
* on the locator. This means that we may be slighly off in our balance.
- *
+ * <p>
* TODO grid fix this hole in the locator.
*/
private static final int ALLOWABLE_ERROR_IN_COUNT = 1;
@@ -78,149 +74,144 @@ public class LocatorLoadBalancingDUnitTest extends LocatorTestBase {
}
/**
- * Test the locator discovers a bridge server and is initialized with
+ * Test the locator discovers a bridge server and is initialized with
* the correct load for that bridge server.
*/
public void testDiscovery() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
- VM vm2 = host.getVM(2);
-// vm0.invoke(new SerializableRunnable() {
-// public void run() {
-// System.setProperty("gemfire.DistributionAdvisor.VERBOSE", "true");
-// }
-// });
-
+
int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
- startLocatorInVM(vm0, locatorPort, "");
-
+ vm0.invoke("Start Locator", () -> startLocator(vm0.getHost(), locatorPort, ""));
+
String locators = getLocatorString(host, locatorPort);
-
- int serverPort = startBridgeServerInVM(vm1, new String[] {"a", "b"}, locators);
-
+
+ int serverPort = vm1.invoke("Start BridgeServer", () -> startBridgeServer(new String[] { "a", "b" }, locators));
+
ServerLoad expectedLoad = new ServerLoad(0f, 1 / 800.0f, 0f, 1f);
ServerLocation expectedLocation = new ServerLocation(NetworkUtils.getServerHostName(vm0
.getHost()), serverPort);
Map expected = new HashMap();
expected.put(expectedLocation, expectedLoad);
-
- checkLocatorLoad(vm0, expected);
-
- int serverPort2 = startBridgeServerInVM(vm2, new String[] {"a", "b"}, locators);
-
+
+ vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected));
+
+ int serverPort2 = vm1.invoke("Start BridgeServer", () -> startBridgeServer(new String[] { "a", "b" }, locators));
+
ServerLocation expectedLocation2 = new ServerLocation(NetworkUtils.getServerHostName(vm0
.getHost()), serverPort2);
-
+
expected.put(expectedLocation2, expectedLoad);
- checkLocatorLoad(vm0, expected);
+ vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected));
}
-
+
/**
* Test that the locator will properly estimate the load for servers when
- * it receives connection requests.
+ * it receives connection requests.
*/
- public void testEstimation() throws UnknownHostException, IOException, ClassNotFoundException {
+ public void testEstimation() throws IOException, ClassNotFoundException {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
-
+
int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
- startLocatorInVM(vm0, locatorPort, "");
+ vm0.invoke("Start Locator", () -> startLocator(vm0.getHost(), locatorPort, ""));
String locators = getLocatorString(host, locatorPort);
-
- int serverPort = startBridgeServerInVM(vm1, new String[] {"a", "b"}, locators);
-
- ServerLoad expectedLoad = new ServerLoad(2/800f, 1 / 800.0f, 0f, 1f);
+
+ int serverPort = vm1.invoke("Start BridgeServer", () -> startBridgeServer(new String[] { "a", "b" }, locators));
+
+ ServerLoad expectedLoad = new ServerLoad(2 / 800f, 1 / 800.0f, 0f, 1f);
ServerLocation expectedLocation = new ServerLocation(NetworkUtils.getServerHostName(host), serverPort);
Map expected = new HashMap();
expected.put(expectedLocation, expectedLoad);
-
+
ClientConnectionResponse response;
response = (ClientConnectionResponse) TcpClient.requestToServer(InetAddress
- .getByName(NetworkUtils.getServerHostName(host)), locatorPort,
+ .getByName(NetworkUtils.getServerHostName(host)), locatorPort,
new ClientConnectionRequest(Collections.EMPTY_SET, null), 10000);
Assert.assertEquals(expectedLocation, response.getServer());
-
+
response = (ClientConnectionResponse) TcpClient.requestToServer(InetAddress
- .getByName(NetworkUtils.getServerHostName(host)), locatorPort,
+ .getByName(NetworkUtils.getServerHostName(host)), locatorPort,
new ClientConnectionRequest(Collections.EMPTY_SET, null), 10000, true);
Assert.assertEquals(expectedLocation, response.getServer());
-
+
//we expect that the connection load load will be 2 * the loadPerConnection
- checkLocatorLoad(vm0, expected);
-
+ vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected));
+
QueueConnectionResponse response2;
response2 = (QueueConnectionResponse) TcpClient.requestToServer(InetAddress
- .getByName(NetworkUtils.getServerHostName(host)), locatorPort,
+ .getByName(NetworkUtils.getServerHostName(host)), locatorPort,
new QueueConnectionRequest(null, 2,
Collections.EMPTY_SET, null, false), 10000, true);
Assert.assertEquals(Collections.singletonList(expectedLocation), response2.getServers());
-
+
response2 = (QueueConnectionResponse) TcpClient
.requestToServer(InetAddress.getByName(NetworkUtils.getServerHostName(host)),
locatorPort, new QueueConnectionRequest(null, 5, Collections.EMPTY_SET, null,
false), 10000, true);
-
+
Assert.assertEquals(Collections.singletonList(expectedLocation), response2.getServers());
//we expect that the queue load will increase by 2
expectedLoad.setSubscriptionConnectionLoad(2f);
- checkLocatorLoad(vm0, expected);
+ vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected));
}
-
+
/**
* Test to make sure the bridge servers communicate
* their updated load to the controller when the load
* on the bridge server changes.
- * @throws Exception
+ *
+ * @throws Exception
*/
public void testLoadMessaging() throws Exception {
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
-
+
int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
- startLocatorInVM(vm0, locatorPort, "");
+ vm0.invoke("Start Locator", () -> startLocator(vm0.getHost(), locatorPort, ""));
String locators = getLocatorString(host, locatorPort);
-
- final int serverPort = startBridgeServerInVM(vm1, new String[] {"a", "b"}, locators);
-
+
+ final int serverPort = vm1.invoke("Start BridgeServer", () -> startBridgeServer(new String[] { "a", "b" }, locators));
+
//We expect 0 load
Map expected = new HashMap();
ServerLocation expectedLocation = new ServerLocation(NetworkUtils.getServerHostName(host), serverPort);
ServerLoad expectedLoad = new ServerLoad(0f, 1 / 800.0f, 0f, 1f);
expected.put(expectedLocation, expectedLoad);
- checkLocatorLoad(vm0, expected);
-
+ vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected));
+
PoolFactoryImpl pf = new PoolFactoryImpl(null);
pf.addServer(NetworkUtils.getServerHostName(host), serverPort);
pf.setMinConnections(8);
pf.setMaxConnections(8);
pf.setSubscriptionEnabled(true);
- startBridgeClientInVM(vm2, pf.getPoolAttributes(), new String[] {REGION_NAME});
-
+ vm2.invoke("StartBridgeClient", () -> startBridgeClient(pf.getPoolAttributes(), new String[] { REGION_NAME }));
+
//We expect 8 client to server connections. The queue requires
//an additional client to server connection, but that shouldn't show up here.
- expectedLoad = new ServerLoad(8/800f, 1 / 800.0f, 1f, 1f);
+ expectedLoad = new ServerLoad(8 / 800f, 1 / 800.0f, 1f, 1f);
expected.put(expectedLocation, expectedLoad);
-
-
- checkLocatorLoad(vm0, expected);
-
+
+ vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected));
+
stopBridgeMemberVM(vm2);
-
+
//Now we expect 0 load
expectedLoad = new ServerLoad(0f, 1 / 800.0f, 0f, 1f);
expected.put(expectedLocation, expectedLoad);
- checkLocatorLoad(vm0, expected);
+ vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected));
}
-
+
/**
* Test to make sure that the locator
* balancing load between two servers.
- * @throws Exception
+ *
+ * @throws Exception
*/
public void testBalancing() throws Exception {
final Host host = Host.getHost(0);
@@ -228,87 +219,60 @@ public class LocatorLoadBalancingDUnitTest extends LocatorTestBase {
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
-
+
int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
- startLocatorInVM(vm0, locatorPort, "");
+ vm0.invoke("Start Locator", () -> startLocator(vm0.getHost(), locatorPort, ""));
String locators = getLocatorString(host, locatorPort);
-
- startBridgeServerInVM(vm1, new String[] {"a", "b"}, locators);
- startBridgeServerInVM(vm2, new String[] {"a", "b"}, locators);
-
+
+ vm1.invoke("Start BridgeServer", () -> startBridgeServer(new String[] { "a", "b" }, locators));
+ vm2.invoke("Start BridgeServer", () -> startBridgeServer(new String[] { "a", "b" }, locators));
+
PoolFactoryImpl pf = new PoolFactoryImpl(null);
pf.addLocator(NetworkUtils.getServerHostName(host), locatorPort);
pf.setMinConnections(80);
pf.setMaxConnections(80);
pf.setSubscriptionEnabled(false);
pf.setIdleTimeout(-1);
- startBridgeClientInVM(vm3, pf.getPoolAttributes(), new String[] {REGION_NAME});
-
- waitForPrefilledConnections(vm3, 80);
-
- checkConnectionCount(vm1, 40);
- checkConnectionCount(vm2, 40);
+ vm3.invoke("StartBridgeClient", () -> startBridgeClient(pf.getPoolAttributes(), new String[] { REGION_NAME }));
+
+ vm3.invoke("waitForPrefilledConnections", () -> waitForPrefilledConnections(80));
+
+ vm1.invoke("check connection count", () -> checkConnectionCount(40));
+ vm2.invoke("check connection count", () -> checkConnectionCount(40));
}
- private void checkConnectionCount(VM vm, final int count) {
- SerializableRunnableIF checkConnectionCount = new SerializableRunnable("checkConnectionCount") {
- public void run() {
- Cache cache = (Cache) remoteObjects.get(CACHE_KEY);
- final CacheServerImpl server = (CacheServerImpl)
- cache.getCacheServers().get(0);
- WaitCriterion wc = new WaitCriterion() {
- String excuse;
- public boolean done() {
- int sz = server.getAcceptor().getStats()
- .getCurrentClientConnections();
- if (Math.abs(sz - count) <= ALLOWABLE_ERROR_IN_COUNT) {
- return true;
- }
- excuse = "Found " + sz + " connections, expected " + count;
- return false;
- }
- public String description() {
- return excuse;
- }
- };
- Wait.waitForCriterion(wc, 5 * 60 * 1000, 1000, true);
+ private void checkConnectionCount(final int count) {
+ Cache cache = (Cache) remoteObjects.get(CACHE_KEY);
+ final CacheServerImpl server = (CacheServerImpl)
+ cache.getCacheServers().get(0);
+ Awaitility.await().pollDelay(100, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS)
+ .timeout(300, TimeUnit.SECONDS).until(() -> {
+ int sz = server.getAcceptor().getStats().getCurrentClientConnections();
+ if (Math.abs(sz - count) <= ALLOWABLE_ERROR_IN_COUNT) {
+ return true;
}
- };
-
- vm.invoke(checkConnectionCount);
+ System.out.println("Found " + sz + " connections, expected " + count);
+ return false;
+ });
}
-
- private void waitForPrefilledConnections(VM vm, final int count) throws Exception {
- waitForPrefilledConnections(vm, count, POOL_NAME);
+
+ private void waitForPrefilledConnections(final int count) throws Exception {
+ waitForPrefilledConnections(count, POOL_NAME);
}
- private void waitForPrefilledConnections(VM vm, final int count, final String poolName) throws Exception {
- SerializableRunnable runnable = new SerializableRunnable("waitForPrefilledConnections") {
- public void run() {
- final PoolImpl pool = (PoolImpl) PoolManager.getAll().get(poolName);
- WaitCriterion ev = new WaitCriterion() {
- public boolean done() {
- return pool.getConnectionCount() >= count;
- }
- public String description() {
- return "connection count never reached " + count;
- }
- };
- Wait.waitForCriterion(ev, MAX_WAIT, 200, true);
- }
- };
- if(vm == null) {
- runnable.run();
- } else {
- vm.invoke(runnable);
- }
+ private void waitForPrefilledConnections(final int count, final String poolName) throws Exception {
+ final PoolImpl pool = (PoolImpl) PoolManager.getAll().get(poolName);
+ Awaitility.await().pollDelay(100, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS)
+ .timeout(300, TimeUnit.SECONDS).until(() -> pool.getConnectionCount() >= count);
}
-
- /** Test that the locator balances load between
+
+ /**
+ * Test that the locator balances load between
* three servers with intersecting server groups.
* Server: 1 2 3
* Groups: a a,b b
- * @throws Exception
+ *
+ * @throws Exception
*/
public void testIntersectingServerGroups() throws Exception {
final Host host = Host.getHost(0);
@@ -316,175 +280,158 @@ public class LocatorLoadBalancingDUnitTest extends LocatorTestBase {
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
-
+
int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
- startLocatorInVM(vm0, locatorPort, "");
+ vm0.invoke("Start Locator", () -> startLocator(vm0.getHost(), locatorPort, ""));
String locators = getLocatorString(host, locatorPort);
-
- int serverPort1 = startBridgeServerInVM(vm1, new String[] {"a"}, locators);
- startBridgeServerInVM(vm2, new String[] {"a", "b"}, locators);
- startBridgeServerInVM(vm3, new String[] {"b"}, locators);
-
+
+ int serverPort1 = vm1.invoke("Start BridgeServer", () -> startBridgeServer(new String[] { "a" }, locators));
+ vm2.invoke("Start BridgeServer", () -> startBridgeServer(new String[] { "a", "b" }, locators));
+ vm3.invoke("Start BridgeServer", () -> startBridgeServer(new String[] { "b" }, locators));
+
PoolFactoryImpl pf = new PoolFactoryImpl(null);
pf.addLocator(NetworkUtils.getServerHostName(host), locatorPort);
pf.setMinConnections(12);
pf.setSubscriptionEnabled(false);
pf.setServerGroup("a");
pf.setIdleTimeout(-1);
- startBridgeClientInVM(null, pf.getPoolAttributes(), new String[] {REGION_NAME});
- waitForPrefilledConnections(null, 12);
-
- checkConnectionCount(vm1, 6);
- checkConnectionCount(vm2, 6);
- checkConnectionCount(vm3, 0);
-
+ startBridgeClient(pf.getPoolAttributes(), new String[] { REGION_NAME });
+ waitForPrefilledConnections(12);
+
+ vm1.invoke("Check Connection Count", () -> checkConnectionCount(6));
+ vm2.invoke("Check Connection Count", () -> checkConnectionCount(6));
+ vm3.invoke("Check Connection Count", () -> checkConnectionCount(0));
+
LogWriterUtils.getLogWriter().info("pool1 prefilled");
-
+
PoolFactoryImpl pf2 = (PoolFactoryImpl) PoolManager.createFactory();
pf2.init(pf.getPoolAttributes());
pf2.setServerGroup("b");
- PoolImpl pool2= (PoolImpl) pf2.create("testPool2");
- waitForPrefilledConnections(null, 12, "testPool2");
+ PoolImpl pool2 = (PoolImpl) pf2.create("testPool2");
+ waitForPrefilledConnections(12, "testPool2");
// The load will not be perfect, because we created all of the connections
//for group A first.
- checkConnectionCount(vm1, 6);
- checkConnectionCount(vm2, 9);
- checkConnectionCount(vm3, 9);
-
+ vm1.invoke("Check Connection Count", () -> checkConnectionCount(6));
+ vm2.invoke("Check Connection Count", () -> checkConnectionCount(9));
+ vm3.invoke("Check Connection Count", () -> checkConnectionCount(9));
+
LogWriterUtils.getLogWriter().info("pool2 prefilled");
-
+
ServerLocation location1 = new ServerLocation(NetworkUtils.getServerHostName(host), serverPort1);
PoolImpl pool1 = (PoolImpl) PoolManager.getAll().get(POOL_NAME);
Assert.assertEquals("a", pool1.getServerGroup());
-
+
//Use up all of the pooled connections on pool1, and acquire 3 more
- for(int i = 0; i < 15; i++) {
+ for (int i = 0; i < 15; i++) {
pool1.acquireConnection();
}
-
+
LogWriterUtils.getLogWriter().info("aquired 15 connections in pool1");
-
+
//now the load should be equal
- checkConnectionCount(vm1, 9);
- checkConnectionCount(vm2, 9);
- checkConnectionCount(vm3, 9);
-
+ vm1.invoke("Check Connection Count", () -> checkConnectionCount(9));
+ vm2.invoke("Check Connection Count", () -> checkConnectionCount(9));
+ vm3.invoke("Check Connection Count", () -> checkConnectionCount(9));
+
//use up all of the pooled connections on pool2
- for(int i = 0; i < 12; i++) {
+ for (int i = 0; i < 12; i++) {
pool2.acquireConnection();
}
-
+
LogWriterUtils.getLogWriter().info("aquired 12 connections in pool2");
-
+
//interleave creating connections in both pools
- for(int i = 0; i < 6; i++) {
+ for (int i = 0; i < 6; i++) {
pool1.acquireConnection();
pool2.acquireConnection();
}
-
+
LogWriterUtils.getLogWriter().info("interleaved 6 connections from pool1 with 6 connections from pool2");
-
+
//The load should still be balanced
- checkConnectionCount(vm1, 13);
- checkConnectionCount(vm2, 13);
- checkConnectionCount(vm3, 13);
-
+ vm1.invoke("Check Connection Count", () -> checkConnectionCount(13));
+ vm2.invoke("Check Connection Count", () -> checkConnectionCount(13));
+ vm3.invoke("Check Connection Count", () -> checkConnectionCount(13));
+
}
-
+
public void testCustomLoadProbe() throws Exception {
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
-// VM vm3 = host.getVM(3);
-
+ // VM vm3 = host.getVM(3);
+
int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
- startLocatorInVM(vm0, locatorPort, "");
+ vm0.invoke("Start Locator", () -> startLocator(vm0.getHost(), locatorPort, ""));
String locators = getLocatorString(host, locatorPort);
-
- ServerLoad load1= new ServerLoad(.3f, .01f, .44f, 4564f);
- ServerLoad load2= new ServerLoad(23.2f, 1.1f, 22.3f, .3f);
- int serverPort1 = startBridgeServerInVM(vm1, null, locators, new String[] {REGION_NAME}, new MyLoadProbe(load1 ));
- int serverPort2 = startBridgeServerInVM(vm2, null, locators, new String[] {REGION_NAME}, new MyLoadProbe(load2 ));
-
+
+ final ServerLoad load1 = new ServerLoad(.3f, .01f, .44f, 4564f);
+ final ServerLoad load2 = new ServerLoad(23.2f, 1.1f, 22.3f, .3f);
+ int serverPort1 = vm1.invoke("Start BridgeServer", () -> startBridgeServer(null, locators, new String[] { REGION_NAME }, new MyLoadProbe(load1)));
+ int serverPort2 = vm2.invoke("Start BridgeServer", () -> startBridgeServer(null, locators, new String[] { REGION_NAME }, new MyLoadProbe(load2)));
+
HashMap expected = new HashMap();
ServerLocation l1 = new ServerLocation(NetworkUtils.getServerHostName(host), serverPort1);
ServerLocation l2 = new ServerLocation(NetworkUtils.getServerHostName(host), serverPort2);
expected.put(l1, load1);
expected.put(l2, load2);
- checkLocatorLoad(vm0, expected);
-
+ vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected));
+
load1.setConnectionLoad(25f);
- changeLoad(vm1, load1);
+ vm1.invoke("changeLoad", () -> changeLoad(load1));
load2.setSubscriptionConnectionLoad(3.5f);
- changeLoad(vm2, load2);
- checkLocatorLoad(vm0, expected);
-
- load1 = new ServerLoad(1f, .1f, 0f, 1f);
- load2 = new ServerLoad(2f, 5f, 0f, 2f);
- expected.put(l1, load1);
- expected.put(l2, load2);
- changeLoad(vm1, load1);
- changeLoad(vm2, load2);
- checkLocatorLoad(vm0, expected);
-
+ vm2.invoke("changeLoad", () -> changeLoad(load2));
+ vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected));
+
+ final ServerLoad load1Updated = new ServerLoad(1f, .1f, 0f, 1f);
+ final ServerLoad load2Updated = new ServerLoad(2f, 5f, 0f, 2f);
+ expected.put(l1, load1Updated);
+ expected.put(l2, load2Updated);
+ vm1.invoke("changeLoad", () -> changeLoad(load1Updated));
+ vm2.invoke("changeLoad", () -> changeLoad(load2Updated));
+ vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected));
+
PoolFactoryImpl pf = new PoolFactoryImpl(null);
pf.addLocator(NetworkUtils.getServerHostName(host), locatorPort);
pf.setMinConnections(20);
pf.setSubscriptionEnabled(true);
pf.setIdleTimeout(-1);
- startBridgeClientInVM(null, pf.getPoolAttributes(), new String[] {REGION_NAME});
- waitForPrefilledConnections(null, 20);
-
+ startBridgeClient(pf.getPoolAttributes(), new String[] { REGION_NAME });
+ waitForPrefilledConnections(20);
+
//The first 10 connection should to go vm1, then 1 to vm2, then another 9 to vm1
//because have unequal values for loadPerConnection
- checkConnectionCount(vm1, 19);
- checkConnectionCount(vm2, 1);
+ vm1.invoke("Check Connection Count", () -> checkConnectionCount(19));
+ vm2.invoke("Check Connection Count", () -> checkConnectionCount(1));
}
-
- public void checkLocatorLoad(VM vm, final Map expected) {
- vm.invoke(new SerializableRunnable() {
- public void run() {
- List locators = Locator.getLocators();
- Assert.assertEquals(1, locators.size());
- InternalLocator locator = (InternalLocator) locators.get(0);
- final ServerLocator sl = locator.getServerLocatorAdvisee();
- InternalLogWriter log = new LocalLogWriter(InternalLogWriter.FINEST_LEVEL, System.out);
- sl.getDistributionAdvisor().dumpProfiles("PROFILES= ");
- WaitCriterion ev = new WaitCriterion() {
- public boolean done() {
- return expected.equals(sl.getLoadMap());
- }
- public String description() {
- return "load map never became equal to " + expected;
- }
- };
- Wait.waitForCriterion(ev, MAX_WAIT, 200, true);
- }
- });
+
+ public void checkLocatorLoad(final Map expected) {
+ List locators = Locator.getLocators();
+ Assert.assertEquals(1, locators.size());
+ InternalLocator locator = (InternalLocator) locators.get(0);
+ final ServerLocator sl = locator.getServerLocatorAdvisee();
+ InternalLogWriter log = new LocalLogWriter(InternalLogWriter.FINEST_LEVEL, System.out);
+ sl.getDistributionAdvisor().dumpProfiles("PROFILES= ");
+ Awaitility.await().pollDelay(100, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS)
+ .timeout(300, TimeUnit.SECONDS).until(() -> expected.equals(sl.getLoadMap()));
}
-
- private void changeLoad(VM vm, final ServerLoad newLoad) {
- vm.invoke(new SerializableRunnable() {
-
- public void run() {
- Cache cache = (Cache) remoteObjects.get(CACHE_KEY);
- CacheServer server = (CacheServer) cache.getCacheServers().get(0);
- MyLoadProbe probe = (MyLoadProbe) server.getLoadProbe();
- probe.setLoad(newLoad);
- }
-
- });
+
+ private void changeLoad(final ServerLoad newLoad) {
+ Cache cache = (Cache) remoteObjects.get(CACHE_KEY);
+ CacheServer server = cache.getCacheServers().get(0);
+ MyLoadProbe probe = (MyLoadProbe) server.getLoadProbe();
+ probe.setLoad(newLoad);
}
-
+
private static class MyLoadProbe extends ServerLoadProbeAdapter implements Serializable {
private ServerLoad load;
-
+
public MyLoadProbe(ServerLoad load) {
this.load = load;
}
-
+
public ServerLoad getLoad(ServerMetrics metrics) {
float connectionLoad = load.getConnectionLoad()
+ metrics.getConnectionCount() * load.getLoadPerConnection();
@@ -493,7 +440,7 @@ public class LocatorLoadBalancingDUnitTest extends LocatorTestBase {
return new ServerLoad(connectionLoad, load.getLoadPerConnection(),
queueLoad, load.getLoadPerSubscriptionConnection());
}
-
+
public void setLoad(ServerLoad load) {
this.load = load;
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c33efb60/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorTestBase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorTestBase.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorTestBase.java
index 2207e1d..af5ba9c 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorTestBase.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorTestBase.java
@@ -16,52 +16,35 @@
*/
package com.gemstone.gemfire.cache.client.internal;
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-
-import com.gemstone.gemfire.cache.AttributesFactory;
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.CacheFactory;
-import com.gemstone.gemfire.cache.DataPolicy;
-import com.gemstone.gemfire.cache.RegionAttributes;
-import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.cache.*;
import com.gemstone.gemfire.cache.client.Pool;
import com.gemstone.gemfire.cache.client.PoolManager;
-import com.gemstone.gemfire.cache.server.ServerLoadProbe;
import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.cache.server.ServerLoadProbe;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.Locator;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
-import com.gemstone.gemfire.internal.AvailablePortHelper;
import com.gemstone.gemfire.internal.cache.PoolFactoryImpl;
-import com.gemstone.gemfire.test.dunit.Assert;
-import com.gemstone.gemfire.test.dunit.DistributedTestCase;
-import com.gemstone.gemfire.test.dunit.Host;
-import com.gemstone.gemfire.test.dunit.Invoke;
-import com.gemstone.gemfire.test.dunit.LogWriterUtils;
-import com.gemstone.gemfire.test.dunit.NetworkUtils;
-import com.gemstone.gemfire.test.dunit.SerializableCallable;
-import com.gemstone.gemfire.test.dunit.SerializableRunnable;
-import com.gemstone.gemfire.test.dunit.VM;
+import com.gemstone.gemfire.test.dunit.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.*;
/**
*
*/
-public abstract class LocatorTestBase extends DistributedTestCase {
+public abstract class LocatorTestBase extends DistributedTestCase {
protected static final String CACHE_KEY = "CACHE";
protected static final String LOCATOR_KEY = "LOCATOR";
protected static final String REGION_NAME = "A_REGION";
protected static final String POOL_NAME = "daPool";
protected static final Object CALLBACK_KEY = "callback";
- /** A map for storing temporary objects in a remote VM so that they can be used
+ /**
+ * A map for storing temporary objects in a remote VM so that they can be used
* between calls. Cleared after each test.
*/
protected static final HashMap remoteObjects = new HashMap();
@@ -69,264 +52,211 @@ public abstract class LocatorTestBase extends DistributedTestCase {
public LocatorTestBase(String name) {
super(name);
}
-
+
@Override
public final void preTearDown() throws Exception {
-
+
SerializableRunnable tearDown = new SerializableRunnable("tearDown") {
public void run() {
Locator locator = (Locator) remoteObjects.get(LOCATOR_KEY);
- if(locator != null) {
+ if (locator != null) {
try {
locator.stop();
- } catch(Exception e) {
+ } catch (Exception e) {
//do nothing
}
}
-
+
Cache cache = (Cache) remoteObjects.get(CACHE_KEY);
- if(cache != null) {
+ if (cache != null) {
try {
cache.close();
- } catch(Exception e) {
+ } catch (Exception e) {
//do nothing
}
}
remoteObjects.clear();
-
+
}
};
//We seem to like leaving the DS open if we can for
//speed, but lets at least destroy our cache and locator.
Invoke.invokeInEveryVM(tearDown);
tearDown.run();
-
+
postTearDownLocatorTestBase();
}
-
+
protected void postTearDownLocatorTestBase() throws Exception {
}
-
- protected void startLocatorInVM(final VM vm, final int locatorPort, final String otherLocators) {
- vm.invoke(new SerializableRunnable("Create Locator") {
- final String testName= getUniqueName();
- public void run() {
- disconnectFromDS();
- Properties props = new Properties();
- props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0));
- props.setProperty(DistributionConfig.LOCATORS_NAME, otherLocators);
- props.setProperty(DistributionConfig.LOG_LEVEL_NAME, LogWriterUtils.getDUnitLogLevel());
- props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
- try {
- File logFile = new File(testName + "-locator" + locatorPort
- + ".log");
- InetAddress bindAddr = null;
- try {
- bindAddr = InetAddress.getByName(NetworkUtils.getServerHostName(vm.getHost()));
- } catch (UnknownHostException uhe) {
- Assert.fail("While resolving bind address ", uhe);
- }
- Locator locator = Locator.startLocatorAndDS(locatorPort, logFile, bindAddr, props);
- remoteObjects.put(LOCATOR_KEY, locator);
- } catch (IOException ex) {
- Assert.fail("While starting locator on port " + locatorPort, ex);
- }
- }
- });
+ protected void startLocator(final Host vmHost, final int locatorPort, final String otherLocators) {
+ disconnectFromDS();
+ Properties props = new Properties();
+ props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0));
+ props.setProperty(DistributionConfig.LOCATORS_NAME, otherLocators);
+ props.setProperty(DistributionConfig.LOG_LEVEL_NAME, LogWriterUtils.getDUnitLogLevel());
+ props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
+ File logFile = new File(getUniqueName() + "-locator" + locatorPort + ".log");
+ try {
+ InetAddress bindAddress = InetAddress.getByName(NetworkUtils.getServerHostName(vmHost));
+ Locator locator = Locator.startLocatorAndDS(locatorPort, logFile, bindAddress, props);
+ remoteObjects.put(LOCATOR_KEY, locator);
+ } catch (UnknownHostException uhe) {
+ Assert.fail("While resolving bind address ", uhe);
+ } catch (IOException ex) {
+ Assert.fail("While starting locator on port " + locatorPort, ex);
+ }
}
-
-
-
- protected void stopLocatorInVM(VM vm) {
- vm.invoke(new SerializableRunnable("Stop Locator") {
- public void run() {
- Locator locator = (Locator) remoteObjects.remove(LOCATOR_KEY);
- locator.stop();
- }
- });
+
+ protected void stopLocator() {
+ Locator locator = (Locator) remoteObjects.remove(LOCATOR_KEY);
+ locator.stop();
}
-
- protected int startBridgeServerInVM(VM vm, String[] groups, String locators) {
- return startBridgeServerInVM(vm, groups, locators, new String[] {REGION_NAME});
+
+ protected int startBridgeServer(String[] groups, String locators) throws IOException {
+ return startBridgeServer(groups, locators, new String[] { REGION_NAME });
}
-
- protected int addCacheServerInVM(VM vm, final String[] groups) {
- SerializableCallable connect =
- new SerializableCallable("Add Bridge server") {
- public Object call() throws Exception {
- Cache cache = (Cache) remoteObjects.get(CACHE_KEY);
- CacheServer server = cache.addCacheServer();
- final int serverPort = AvailablePortHelper.getRandomAvailableTCPPort();
- server.setPort(serverPort);
- server.setGroups(groups);
- server.start();
- return new Integer(serverPort);
- }
- };
- Integer port = (Integer) vm.invoke(connect);
- return port.intValue();
+ protected int addCacheServer(final String[] groups) throws IOException {
+ Cache cache = (Cache) remoteObjects.get(CACHE_KEY);
+ CacheServer server = cache.addCacheServer();
+ server.setPort(0);
+ server.setGroups(groups);
+ server.start();
+ return new Integer(server.getPort());
}
-
- protected int startBridgeServerInVM(VM vm, final String[] groups, final String locators, final String[] regions) {
- return startBridgeServerInVM(vm, groups, locators, regions, CacheServer.DEFAULT_LOAD_PROBE);
+
+ protected int startBridgeServer(final String[] groups, final String locators, final String[] regions) throws IOException {
+ return startBridgeServer(groups, locators, regions, CacheServer.DEFAULT_LOAD_PROBE);
}
-
- protected int startBridgeServerInVM(VM vm, final String[] groups, final String locators, final String[] regions, final ServerLoadProbe probe) {
- SerializableCallable connect =
- new SerializableCallable("Start bridge server") {
- public Object call() throws IOException {
- Properties props = new Properties();
- props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0));
- props.setProperty(DistributionConfig.LOCATORS_NAME, locators);
- DistributedSystem ds = getSystem(props);
- Cache cache = CacheFactory.create(ds);
- AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.DISTRIBUTED_ACK);
- factory.setEnableBridgeConflation(true);
- factory.setDataPolicy(DataPolicy.REPLICATE);
- RegionAttributes attrs = factory.create();
- for(int i = 0; i < regions.length; i++) {
- cache.createRegion(regions[i], attrs);
- }
- CacheServer server = cache.addCacheServer();
- final int serverPort = AvailablePortHelper.getRandomAvailableTCPPort();
- server.setPort(serverPort);
- server.setGroups(groups);
- server.setLoadProbe(probe);
- server.start();
-
- remoteObjects.put(CACHE_KEY, cache);
-
- return new Integer(serverPort);
- }
- };
- Integer port = (Integer) vm.invoke(connect);
- return port.intValue();
+
+ protected int startBridgeServer(final String[] groups, final String locators, final String[] regions, final ServerLoadProbe probe) throws IOException {
+ Properties props = new Properties();
+ props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0));
+ props.setProperty(DistributionConfig.LOCATORS_NAME, locators);
+ DistributedSystem ds = getSystem(props);
+ Cache cache = CacheFactory.create(ds);
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.DISTRIBUTED_ACK);
+ factory.setEnableBridgeConflation(true);
+ factory.setDataPolicy(DataPolicy.REPLICATE);
+ RegionAttributes attributes = factory.create();
+ for (int i = 0; i < regions.length; i++) {
+ cache.createRegion(regions[i], attributes);
+ }
+ CacheServer server = cache.addCacheServer();
+ server.setPort(0);
+ server.setGroups(groups);
+ server.setLoadProbe(probe);
+ server.start();
+
+ remoteObjects.put(CACHE_KEY, cache);
+
+ return new Integer(server.getPort());
}
-
- protected int startBridgeServerWithEmbeddedLocator(VM vm, final String[] groups, final String locators, final String[] regions, final ServerLoadProbe probe) {
- SerializableCallable connect =
- new SerializableCallable("Start bridge server") {
- public Object call() throws IOException {
- Properties props = new Properties();
- props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0));
- props.setProperty(DistributionConfig.START_LOCATOR_NAME, locators);
- props.setProperty(DistributionConfig.LOCATORS_NAME, locators);
- DistributedSystem ds = getSystem(props);
- Cache cache = CacheFactory.create(ds);
- AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.DISTRIBUTED_ACK);
- factory.setEnableBridgeConflation(true);
- factory.setDataPolicy(DataPolicy.REPLICATE);
- RegionAttributes attrs = factory.create();
- for(int i = 0; i < regions.length; i++) {
- cache.createRegion(regions[i], attrs);
- }
- CacheServer server = cache.addCacheServer();
- server.setGroups(groups);
- server.setLoadProbe(probe);
- final int serverPort = AvailablePortHelper.getRandomAvailableTCPPort();
- server.setPort(serverPort);
- server.start();
-
- remoteObjects.put(CACHE_KEY, cache);
-
- return new Integer(serverPort);
- }
- };
- Integer port = (Integer) vm.invoke(connect);
- return port.intValue();
+
+ protected int startBridgeServerWithEmbeddedLocator(final String[] groups, final String locators, final String[] regions, final ServerLoadProbe probe)
+ throws IOException {
+ Properties props = new Properties();
+ props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0));
+ props.setProperty(DistributionConfig.START_LOCATOR_NAME, locators);
+ props.setProperty(DistributionConfig.LOCATORS_NAME, locators);
+ DistributedSystem ds = getSystem(props);
+ Cache cache = CacheFactory.create(ds);
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.DISTRIBUTED_ACK);
+ factory.setEnableBridgeConflation(true);
+ factory.setDataPolicy(DataPolicy.REPLICATE);
+ RegionAttributes attrs = factory.create();
+ for (int i = 0; i < regions.length; i++) {
+ cache.createRegion(regions[i], attrs);
+ }
+ CacheServer server = cache.addCacheServer();
+ server.setGroups(groups);
+ server.setLoadProbe(probe);
+ server.setPort(0);
+ server.start();
+
+ remoteObjects.put(CACHE_KEY, cache);
+
+ return new Integer(server.getPort());
}
-
- protected void startBridgeClientInVM(VM vm, final String group, final String host, final int port) throws Exception {
- startBridgeClientInVM(vm, group, host, port, new String[] {REGION_NAME});
+
+ protected void startBridgeClient(final String group, final String host, final int port) throws Exception {
+ startBridgeClient(group, host, port, new String[] { REGION_NAME });
}
-
- protected void startBridgeClientInVM(VM vm, final String group, final String host, final int port, final String[] regions) throws Exception {
+ protected void startBridgeClient(final String group, final String host, final int port, final String[] regions) throws Exception {
PoolFactoryImpl pf = new PoolFactoryImpl(null);
pf.addLocator(host, port)
- .setServerGroup(group)
- .setPingInterval(200)
- .setSubscriptionEnabled(true)
- .setSubscriptionRedundancy(-1);
- startBridgeClientInVM(vm, pf.getPoolAttributes(), regions);
+ .setServerGroup(group)
+ .setPingInterval(200)
+ .setSubscriptionEnabled(true)
+ .setSubscriptionRedundancy(-1);
+ startBridgeClient(pf.getPoolAttributes(), regions);
}
-
- protected void startBridgeClientInVM(VM vm, final Pool pool, final String[] regions) throws Exception {
- SerializableRunnable connect =
- new SerializableRunnable("Start bridge client") {
- public void run() {
- Properties props = new Properties();
- props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0));
- props.setProperty(DistributionConfig.LOCATORS_NAME, "");
- DistributedSystem ds = getSystem(props);
- Cache cache = CacheFactory.create(ds);
- AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.LOCAL);
-// factory.setEnableBridgeConflation(true);
-// factory.setDataPolicy(DataPolicy.NORMAL);
- factory.setPoolName(POOL_NAME);
- PoolFactoryImpl pf= (PoolFactoryImpl) PoolManager.createFactory();
- pf.init(pool);
- LocatorDiscoveryCallback locatorCallback = new MyLocatorCallback();
- remoteObjects.put(CALLBACK_KEY, locatorCallback);
- pf.setLocatorDiscoveryCallback(locatorCallback);
- pf.create(POOL_NAME);
-
-
- RegionAttributes attrs = factory.create();
- for(int i = 0; i < regions.length; i++) {
- cache.createRegion(regions[i], attrs);
- }
-
- remoteObjects.put(CACHE_KEY, cache);
- }
- };
-
- if(vm == null) {
- connect.run();
- } else {
- vm.invoke(connect);
+
+ protected void startBridgeClient(final Pool pool, final String[] regions) throws Exception {
+ Properties props = new Properties();
+ props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0));
+ props.setProperty(DistributionConfig.LOCATORS_NAME, "");
+ DistributedSystem ds = getSystem(props);
+ Cache cache = CacheFactory.create(ds);
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ // factory.setEnableBridgeConflation(true);
+ // factory.setDataPolicy(DataPolicy.NORMAL);
+ factory.setPoolName(POOL_NAME);
+ PoolFactoryImpl pf = (PoolFactoryImpl) PoolManager.createFactory();
+ pf.init(pool);
+ LocatorDiscoveryCallback locatorCallback = new MyLocatorCallback();
+ remoteObjects.put(CALLBACK_KEY, locatorCallback);
+ pf.setLocatorDiscoveryCallback(locatorCallback);
+ pf.create(POOL_NAME);
+
+ RegionAttributes attrs = factory.create();
+ for (int i = 0; i < regions.length; i++) {
+ cache.createRegion(regions[i], attrs);
}
+
+ remoteObjects.put(CACHE_KEY, cache);
}
-
+
protected void stopBridgeMemberVM(VM vm) {
- vm.invoke(new SerializableRunnable("Stop bridge member") {
- public void run() {
- Cache cache = (Cache) remoteObjects.remove(CACHE_KEY);
- cache.close();
- disconnectFromDS();
- }
- });
+ vm.invoke(new SerializableRunnable("Stop bridge member") {
+ public void run() {
+ Cache cache = (Cache) remoteObjects.remove(CACHE_KEY);
+ cache.close();
+ disconnectFromDS();
+ }
+ });
}
-
+
public String getLocatorString(Host host, int locatorPort) {
- return getLocatorString(host, new int[] {locatorPort});
+ return getLocatorString(host, new int[] { locatorPort });
}
-
+
public String getLocatorString(Host host, int[] locatorPorts) {
StringBuffer str = new StringBuffer();
- for(int i = 0; i < locatorPorts.length; i++) {
+ for (int i = 0; i < locatorPorts.length; i++) {
str.append(NetworkUtils.getServerHostName(host))
.append("[")
.append(locatorPorts[i])
.append("]");
- if(i < locatorPorts.length - 1) {
+ if (i < locatorPorts.length - 1) {
str.append(",");
}
}
-
+
return str.toString();
}
-
+
protected static class MyLocatorCallback extends LocatorDiscoveryCallbackAdapter {
private final Set discoveredLocators = new HashSet();
private final Set removedLocators = new HashSet();
-
+
public synchronized void locatorsDiscovered(List locators) {
discoveredLocators.addAll(locators);
notifyAll();
@@ -336,29 +266,29 @@ public abstract class LocatorTestBase extends DistributedTestCase {
removedLocators.addAll(locators);
notifyAll();
}
-
+
public boolean waitForDiscovery(InetSocketAddress locator, long time) throws InterruptedException {
return waitFor(discoveredLocators, locator, time);
}
-
+
public boolean waitForRemove(InetSocketAddress locator, long time) throws InterruptedException {
return waitFor(removedLocators, locator, time);
}
-
+
private synchronized boolean waitFor(Set set, InetSocketAddress locator, long time) throws InterruptedException {
long remaining = time;
long endTime = System.currentTimeMillis() + time;
- while(!set.contains(locator) && remaining >= 0) {
+ while (!set.contains(locator) && remaining >= 0) {
wait(remaining);
- remaining = endTime - System.currentTimeMillis();
+ remaining = endTime - System.currentTimeMillis();
}
return set.contains(locator);
}
-
+
public synchronized Set getDiscovered() {
return new HashSet(discoveredLocators);
}
-
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c33efb60/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/Bug47667DUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/Bug47667DUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/Bug47667DUnitTest.java
index de43c29..0f08456 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/Bug47667DUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/Bug47667DUnitTest.java
@@ -56,11 +56,11 @@ public class Bug47667DUnitTest extends LocatorTestBase {
final int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
final String locatorHost = NetworkUtils.getServerHostName(host);
- startLocatorInVM(locator, locatorPort, "");
+ locator.invoke("Start Locator",() ->startLocator(locator.getHost(), locatorPort, ""));
String locString = getLocatorString(host, locatorPort);
- startBridgeServerInVM(server1, new String[] {"R1"}, locString, new String[] {"R1"});
- startBridgeServerInVM(server2, new String[] {"R2"}, locString, new String[] {"R2"});
+ server1.invoke("Start BridgeServer", () -> startBridgeServer(new String[] {"R1"}, locString, new String[] {"R1"}));
+ server2.invoke("Start BridgeServer", () -> startBridgeServer(new String[] {"R2"}, locString, new String[] {"R2"}));
client.invoke(new SerializableCallable() {
@Override
@@ -70,15 +70,15 @@ public class Bug47667DUnitTest extends LocatorTestBase {
ClientCache cache = ccf.create();
PoolManager.createFactory().addLocator(locatorHost, locatorPort).setServerGroup("R1").create("R1");
PoolManager.createFactory().addLocator(locatorHost, locatorPort).setServerGroup("R2").create("R2");
- Region r1 = cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).setPoolName("R1").create("R1");
- Region r2 = cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).setPoolName("R2").create("R2");
- CacheTransactionManager mgr = cache.getCacheTransactionManager();
- mgr.begin();
- r1.put(1, "value1");
- mgr.commit();
- mgr.begin();
- r2.put(2, "value2");
- mgr.commit();
+ Region region1 = cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).setPoolName("R1").create("R1");
+ Region region2 = cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).setPoolName("R2").create("R2");
+ CacheTransactionManager transactionManager = cache.getCacheTransactionManager();
+ transactionManager.begin();
+ region1.put(1, "value1");
+ transactionManager.commit();
+ transactionManager.begin();
+ region2.put(2, "value2");
+ transactionManager.commit();
return null;
}
});
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c33efb60/geode-cq/src/test/java/com/gemstone/gemfire/management/CacheServerManagementDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/test/java/com/gemstone/gemfire/management/CacheServerManagementDUnitTest.java b/geode-cq/src/test/java/com/gemstone/gemfire/management/CacheServerManagementDUnitTest.java
index d784397..167cc3a 100644
--- a/geode-cq/src/test/java/com/gemstone/gemfire/management/CacheServerManagementDUnitTest.java
+++ b/geode-cq/src/test/java/com/gemstone/gemfire/management/CacheServerManagementDUnitTest.java
@@ -16,26 +16,9 @@
*/
package com.gemstone.gemfire.management;
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Collections;
-import java.util.Properties;
-
-import javax.management.InstanceNotFoundException;
-import javax.management.MBeanServer;
-import javax.management.Notification;
-import javax.management.NotificationListener;
-import javax.management.ObjectName;
-
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.client.internal.LocatorTestBase;
-import com.gemstone.gemfire.cache.query.IndexExistsException;
-import com.gemstone.gemfire.cache.query.IndexInvalidException;
-import com.gemstone.gemfire.cache.query.IndexNameConflictException;
-import com.gemstone.gemfire.cache.query.QueryService;
-import com.gemstone.gemfire.cache.query.RegionNotFoundException;
+import com.gemstone.gemfire.cache.query.*;
import com.gemstone.gemfire.cache.query.cq.dunit.CqQueryDUnitTest;
import com.gemstone.gemfire.cache.query.internal.cq.CqService;
import com.gemstone.gemfire.cache.server.CacheServer;
@@ -46,37 +29,34 @@ import com.gemstone.gemfire.internal.AvailablePort;
import com.gemstone.gemfire.internal.AvailablePortHelper;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.management.internal.JmxManagerLocatorRequest;
-import com.gemstone.gemfire.management.internal.JmxManagerLocatorResponse;
import com.gemstone.gemfire.management.internal.MBeanJMXAdapter;
import com.gemstone.gemfire.management.internal.SystemManagementService;
-import com.gemstone.gemfire.test.dunit.Assert;
-import com.gemstone.gemfire.test.dunit.Host;
-import com.gemstone.gemfire.test.dunit.LogWriterUtils;
-import com.gemstone.gemfire.test.dunit.NetworkUtils;
-import com.gemstone.gemfire.test.dunit.SerializableRunnable;
-import com.gemstone.gemfire.test.dunit.VM;
-import com.gemstone.gemfire.test.dunit.Wait;
-import com.gemstone.gemfire.test.dunit.WaitCriterion;
+import com.gemstone.gemfire.test.dunit.*;
+
+import javax.management.*;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.Properties;
/**
* Cache Server related management test cases
- *
- *
*/
public class CacheServerManagementDUnitTest extends LocatorTestBase {
private static final long serialVersionUID = 1L;
-
- private static int CONNECT_LOCATOR_TIMEOUT_MS = 30000;
+
+ private static int CONNECT_LOCATOR_TIMEOUT_MS = 30000;
private ManagementTestBase helper;
private static final String queryName = "testClientWithFeederAndCQ_0";
private static final String indexName = "testIndex";
-
+
private static MBeanServer mbeanServer = MBeanJMXAdapter.mbeanServer;
-
protected CqQueryDUnitTest cqDUnitTest = new CqQueryDUnitTest(
"CqDataDUnitTest");
@@ -84,7 +64,7 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase {
public CacheServerManagementDUnitTest(String name) {
super(name);
this.helper = new ManagementTestBase(name);
-
+
}
@Override
@@ -98,7 +78,6 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase {
}
/**
- *
* @throws Exception
*/
public void testCacheServerMBean() throws Exception {
@@ -112,12 +91,11 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase {
helper.startManagingNode(managingNode);
//helper.createCache(server);
int serverPort = AvailablePortHelper.getRandomAvailableTCPPort();
- cqDUnitTest.createServer(server,serverPort);
-
-
+ cqDUnitTest.createServer(server, serverPort);
+
DistributedMember member = helper.getMember(server);
-
- verifyCacheServer(server,serverPort);
+
+ verifyCacheServer(server, serverPort);
final int port = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(server.getHost());
@@ -145,10 +123,10 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase {
// Close.
Wait.pause(2000);
- checkNavigation(managingNode,member,serverPort);
- verifyIndex(server,serverPort);
+ checkNavigation(managingNode, member, serverPort);
+ verifyIndex(server, serverPort);
// This will test all CQs and will close the cq in its final step
- verifyCacheServerRemote(managingNode, member,serverPort);
+ verifyCacheServerRemote(managingNode, member, serverPort);
verifyClosedCQ(server);
@@ -162,30 +140,30 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase {
/**
* Test for client server connection related management artifacts
- * like notifications
+ * like notifications
+ *
* @throws Exception
*/
-
+
public void testCacheClient() throws Exception {
-
+
final Host host = Host.getHost(0);
VM locator = host.getVM(0);
VM server = host.getVM(1);
VM client = host.getVM(2);
-
+
int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
- startLocatorInVM(locator, locatorPort, "");
-
- String locators = NetworkUtils.getServerHostName(locator.getHost())+ "[" + locatorPort + "]";
-
-
- int serverPort = startBridgeServerInVM(server, null, locators);
-
- addClientNotifListener(server,serverPort);
+ locator.invoke("Start Locator", () -> startLocator(locator.getHost(), locatorPort, ""));
+
+ String locators = NetworkUtils.getServerHostName(locator.getHost()) + "[" + locatorPort + "]";
+
+ int serverPort = server.invoke("Start BridgeServer", () -> startBridgeServer(null, locators));
+
+ addClientNotifListener(server, serverPort);
// Start a client and make sure that proper notification is received
- startBridgeClientInVM(client, null, NetworkUtils.getServerHostName(locator.getHost()), locatorPort);
-
+ client.invoke("Start BridgeClient", () -> startBridgeClient(null, NetworkUtils.getServerHostName(locator.getHost()), locatorPort));
+
//stop the client and make sure the bridge server notifies
stopBridgeMemberVM(client);
helper.closeCache(locator);
@@ -193,13 +171,14 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase {
helper.closeCache(client);
}
-
+
/**
* Intention of this test is to check if a node becomes manager after all the nodes are alive
* it should have all the information of all the members.
- *
+ * <p>
* Thats why used service.getLocalManager().runManagementTaskAdhoc() to make node
* ready for federation when manager node comes up
+ *
* @throws Exception
*/
@@ -208,96 +187,72 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase {
final Host host = Host.getHost(0);
VM locator = host.getVM(0);
VM server = host.getVM(1);
-
+
//Step 1:
final int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
- startLocator(locator, locatorPort, "");
+ locator.invoke("Start Locator", () -> startLocator(locator.getHost(), locatorPort, ""));
- String locators = NetworkUtils.getServerHostName(locator.getHost())+ "[" + locatorPort + "]";
-
- //Step 2:
- int serverPort = startBridgeServerInVM(server, null, locators);
-
- //Step 3:
- server.invoke(new SerializableRunnable("Check Server") {
-
- public void run() {
- Cache cache = GemFireCacheImpl.getInstance();
- assertNotNull(cache);
- SystemManagementService service = (SystemManagementService)ManagementService
- .getExistingManagementService(cache);
- assertNotNull(service);
- assertFalse(service.isManager());
- assertNotNull(service.getMemberMXBean());
- service.getLocalManager().runManagementTaskAdhoc();
+ String locators = NetworkUtils.getServerHostName(locator.getHost()) + "[" + locatorPort + "]";
+ //Step 2:
+ server.invoke("Start BridgeServer", () -> startBridgeServer(null, locators));
- }
+ //Step 3:
+ server.invoke("Check Server", () -> {
+ Cache cache = GemFireCacheImpl.getInstance();
+ assertNotNull(cache);
+ SystemManagementService service = (SystemManagementService) ManagementService
+ .getExistingManagementService(cache);
+ assertNotNull(service);
+ assertFalse(service.isManager());
+ assertNotNull(service.getMemberMXBean());
+ service.getLocalManager().runManagementTaskAdhoc();
});
-
- //Step 4:
- JmxManagerLocatorResponse locRes = JmxManagerLocatorRequest.send(locator
- .getHost().getHostName(), locatorPort, CONNECT_LOCATOR_TIMEOUT_MS, Collections.<String, String> emptyMap());
-
- //Step 5:
- locator.invoke(new SerializableRunnable("Check locator") {
- public void run() {
- Cache cache = GemFireCacheImpl.getInstance();
- assertNotNull(cache);
- ManagementService service = ManagementService
- .getExistingManagementService(cache);
- assertNotNull(service);
- assertTrue(service.isManager());
- LocatorMXBean bean = service.getLocalLocatorMXBean();
- assertEquals(locatorPort, bean.getPort());
- DistributedSystemMXBean dsBean = service.getDistributedSystemMXBean();
- ObjectName[] names = dsBean.listMemberObjectNames();
-
- assertEquals(2,dsBean.listMemberObjectNames().length);
-
- }
+ //Step 4:
+ JmxManagerLocatorRequest.send(locator
+ .getHost().getHostName(), locatorPort, CONNECT_LOCATOR_TIMEOUT_MS, Collections.<String, String>emptyMap());
+
+ //Step 5:
+ locator.invoke("Check locator", () -> {
+ Cache cache = GemFireCacheImpl.getInstance();
+ assertNotNull(cache);
+ ManagementService service = ManagementService
+ .getExistingManagementService(cache);
+ assertNotNull(service);
+ assertTrue(service.isManager());
+ LocatorMXBean bean = service.getLocalLocatorMXBean();
+ assertEquals(locatorPort, bean.getPort());
+ DistributedSystemMXBean dsBean = service.getDistributedSystemMXBean();
+
+ assertEquals(2, dsBean.listMemberObjectNames().length);
});
-
-
helper.closeCache(locator);
helper.closeCache(server);
-
-
+
}
-
-
- protected void startLocator(final VM vm, final int locatorPort, final String otherLocators) {
- vm.invoke(new SerializableRunnable("Create Locator") {
- final String testName= getUniqueName();
- public void run() {
- disconnectFromDS();
- Properties props = new Properties();
- props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0));
- props.setProperty(DistributionConfig.LOCATORS_NAME, otherLocators);
- props.setProperty(DistributionConfig.LOG_LEVEL_NAME, LogWriterUtils.getDUnitLogLevel());
- props.setProperty(DistributionConfig.JMX_MANAGER_HTTP_PORT_NAME, "0");
- props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
- try {
- File logFile = new File(testName + "-locator" + locatorPort
- + ".log");
- InetAddress bindAddr = null;
- try {
- bindAddr = InetAddress.getByName(NetworkUtils.getServerHostName(vm.getHost()));
- } catch (UnknownHostException uhe) {
- Assert.fail("While resolving bind address ", uhe);
- }
- Locator locator = Locator.startLocatorAndDS(locatorPort, logFile, bindAddr, props);
- remoteObjects.put(LOCATOR_KEY, locator);
- } catch (IOException ex) {
- Assert.fail("While starting locator on port " + locatorPort, ex);
- }
- }
- });
+ protected void startLocator(Host vmHost, final int locatorPort, final String otherLocators) {
+ disconnectFromDS();
+ Properties props = new Properties();
+ props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0));
+ props.setProperty(DistributionConfig.LOCATORS_NAME, otherLocators);
+ props.setProperty(DistributionConfig.LOG_LEVEL_NAME, LogWriterUtils.getDUnitLogLevel());
+ props.setProperty(DistributionConfig.JMX_MANAGER_HTTP_PORT_NAME, "0");
+ props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
+ File logFile = new File(getUniqueName() + "-locator" + locatorPort + ".log");
+ try {
+ InetAddress bindAddr = InetAddress.getByName(NetworkUtils.getServerHostName(vmHost));
+ Locator locator = Locator.startLocatorAndDS(locatorPort, logFile, bindAddr, props);
+ remoteObjects.put(LOCATOR_KEY, locator);
+ } catch (UnknownHostException uhe) {
+ Assert.fail("While resolving bind address ", uhe);
+ } catch (IOException ex) {
+ Assert.fail("While starting locator on port " + locatorPort, ex);
+ }
}
-
+
protected void checkNavigation(final VM vm,
final DistributedMember cacheServerMember, final int serverPort) {
SerializableRunnable checkNavigation = new SerializableRunnable(
@@ -327,14 +282,14 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase {
};
vm.invoke(checkNavigation);
}
-
+
/**
* Verify the Cache Server details
- *
+ *
* @param vm
*/
@SuppressWarnings("serial")
- protected void addClientNotifListener(final VM vm , final int serverPort) throws Exception {
+ protected void addClientNotifListener(final VM vm, final int serverPort) throws Exception {
SerializableRunnable addClientNotifListener = new SerializableRunnable(
"Add Client Notif Listener") {
public void run() {
@@ -359,18 +314,19 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase {
TestCacheServerNotif nt = new TestCacheServerNotif();
try {
mbeanServer.addNotificationListener(MBeanJMXAdapter
- .getClientServiceMBeanName(serverPort,cache.getDistributedSystem().getMemberId()), nt, null, null);
+ .getClientServiceMBeanName(serverPort, cache.getDistributedSystem().getMemberId()), nt, null, null);
} catch (InstanceNotFoundException e) {
fail("Failed With Exception " + e);
}
-
+
}
};
vm.invoke(addClientNotifListener);
}
+
/**
* Verify the closed CQ which is closed from Managing Node
- *
+ *
* @param vm
*/
@SuppressWarnings("serial")
@@ -405,7 +361,7 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase {
bean.removeIndex(indexName);
} catch (Exception e) {
fail("Failed With Exception " + e);
-
+
}
assertEquals(bean.getIndexCount(), 0);
@@ -416,7 +372,7 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase {
/**
* Verify the closed CQ which is closed from Managing Node
- *
+ *
* @param vm
*/
@SuppressWarnings("serial")
@@ -434,11 +390,9 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase {
vm.invoke(verifyClosedCQ);
}
-
-
/**
* Verify the Cache Server details
- *
+ *
* @param vm
*/
@SuppressWarnings("serial")
@@ -496,7 +450,7 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase {
/**
* Verify the Cache Server details
- *
+ *
* @param vm
*/
@SuppressWarnings("serial")
@@ -522,14 +476,14 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase {
LogWriterUtils.getLogWriter().info(
"<ExpectedString> Active Query Count "
+ bean.getActiveCQCount() + "</ExpectedString> ");
-
+
LogWriterUtils.getLogWriter().info(
"<ExpectedString> Registered Query Count "
+ bean.getRegisteredCQCount() + "</ExpectedString> ");
- assertTrue(bean.showAllClientStats()[0].getClientCQCount() == 1);
- int numQueues = bean.getNumSubscriptions();
- assertEquals(numQueues, 1);
+ assertTrue(bean.showAllClientStats()[0].getClientCQCount() == 1);
+ int numQueues = bean.getNumSubscriptions();
+ assertEquals(numQueues, 1);
// test for client connection Count
/* @TODO */
@@ -552,11 +506,9 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase {
};
vm.invoke(verifyCacheServerRemote);
}
-
+
/**
* Notification handler
- *
- *
*/
private static class TestCacheServerNotif implements
NotificationListener {