You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/05/13 18:37:38 UTC
[28/32] incubator-geode git commit: Revert "GEODE-1376: Cleaned up
server port to be '0'."
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d1a0748b/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorLoadBalancingDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorLoadBalancingDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorLoadBalancingDUnitTest.java
index 176fbea..9ef87d2 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorLoadBalancingDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorLoadBalancingDUnitTest.java
@@ -16,6 +16,17 @@
*/
package com.gemstone.gemfire.cache.client.internal;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Assert;
+
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.client.PoolManager;
import com.gemstone.gemfire.cache.client.internal.locator.ClientConnectionRequest;
@@ -39,31 +50,24 @@ import com.gemstone.gemfire.internal.logging.LocalLogWriter;
import com.gemstone.gemfire.test.dunit.Host;
import com.gemstone.gemfire.test.dunit.LogWriterUtils;
import com.gemstone.gemfire.test.dunit.NetworkUtils;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+import com.gemstone.gemfire.test.dunit.SerializableRunnableIF;
import com.gemstone.gemfire.test.dunit.VM;
-import com.jayway.awaitility.Awaitility;
-import org.junit.Assert;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.InetAddress;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import com.gemstone.gemfire.test.dunit.Wait;
+import com.gemstone.gemfire.test.dunit.WaitCriterion;
/**
*
*/
public class LocatorLoadBalancingDUnitTest extends LocatorTestBase {
-
+
/**
* The number of connections that we can be off by in the balancing tests
* We need this little fudge factor, because the locator can receive an update
* from the bridge server after it has made incremented its counter for a client
* connection, but the client hasn't connected yet. This wipes out the estimation
* on the locator. This means that we may be slighly off in our balance.
- * <p>
+ *
* TODO grid fix this hole in the locator.
*/
private static final int ALLOWABLE_ERROR_IN_COUNT = 1;
@@ -74,144 +78,149 @@ public class LocatorLoadBalancingDUnitTest extends LocatorTestBase {
}
/**
- * Test the locator discovers a bridge server and is initialized with
+ * Test the locator discovers a bridge server and is initialized with
* the correct load for that bridge server.
*/
public void testDiscovery() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
-
+ VM vm2 = host.getVM(2);
+// vm0.invoke(new SerializableRunnable() {
+// public void run() {
+// System.setProperty("gemfire.DistributionAdvisor.VERBOSE", "true");
+// }
+// });
+
int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
- vm0.invoke("Start Locator", () -> startLocator(vm0.getHost(), locatorPort, ""));
-
+ startLocatorInVM(vm0, locatorPort, "");
+
String locators = getLocatorString(host, locatorPort);
-
- int serverPort = vm1.invoke("Start BridgeServer", () -> startBridgeServer(new String[] { "a", "b" }, locators));
-
+
+ int serverPort = startBridgeServerInVM(vm1, new String[] {"a", "b"}, locators);
+
ServerLoad expectedLoad = new ServerLoad(0f, 1 / 800.0f, 0f, 1f);
ServerLocation expectedLocation = new ServerLocation(NetworkUtils.getServerHostName(vm0
.getHost()), serverPort);
Map expected = new HashMap();
expected.put(expectedLocation, expectedLoad);
-
- vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected));
-
- int serverPort2 = vm1.invoke("Start BridgeServer", () -> startBridgeServer(new String[] { "a", "b" }, locators));
-
+
+ checkLocatorLoad(vm0, expected);
+
+ int serverPort2 = startBridgeServerInVM(vm2, new String[] {"a", "b"}, locators);
+
ServerLocation expectedLocation2 = new ServerLocation(NetworkUtils.getServerHostName(vm0
.getHost()), serverPort2);
-
+
expected.put(expectedLocation2, expectedLoad);
- vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected));
+ checkLocatorLoad(vm0, expected);
}
-
+
/**
* Test that the locator will properly estimate the load for servers when
- * it receives connection requests.
+ * it receives connection requests.
*/
- public void testEstimation() throws IOException, ClassNotFoundException {
+ public void testEstimation() throws UnknownHostException, IOException, ClassNotFoundException {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
-
+
int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
- vm0.invoke("Start Locator", () -> startLocator(vm0.getHost(), locatorPort, ""));
+ startLocatorInVM(vm0, locatorPort, "");
String locators = getLocatorString(host, locatorPort);
-
- int serverPort = vm1.invoke("Start BridgeServer", () -> startBridgeServer(new String[] { "a", "b" }, locators));
-
- ServerLoad expectedLoad = new ServerLoad(2 / 800f, 1 / 800.0f, 0f, 1f);
+
+ int serverPort = startBridgeServerInVM(vm1, new String[] {"a", "b"}, locators);
+
+ ServerLoad expectedLoad = new ServerLoad(2/800f, 1 / 800.0f, 0f, 1f);
ServerLocation expectedLocation = new ServerLocation(NetworkUtils.getServerHostName(host), serverPort);
Map expected = new HashMap();
expected.put(expectedLocation, expectedLoad);
-
+
ClientConnectionResponse response;
response = (ClientConnectionResponse) TcpClient.requestToServer(InetAddress
- .getByName(NetworkUtils.getServerHostName(host)), locatorPort,
+ .getByName(NetworkUtils.getServerHostName(host)), locatorPort,
new ClientConnectionRequest(Collections.EMPTY_SET, null), 10000);
Assert.assertEquals(expectedLocation, response.getServer());
-
+
response = (ClientConnectionResponse) TcpClient.requestToServer(InetAddress
- .getByName(NetworkUtils.getServerHostName(host)), locatorPort,
+ .getByName(NetworkUtils.getServerHostName(host)), locatorPort,
new ClientConnectionRequest(Collections.EMPTY_SET, null), 10000, true);
Assert.assertEquals(expectedLocation, response.getServer());
-
+
//we expect that the connection load load will be 2 * the loadPerConnection
- vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected));
-
+ checkLocatorLoad(vm0, expected);
+
QueueConnectionResponse response2;
response2 = (QueueConnectionResponse) TcpClient.requestToServer(InetAddress
- .getByName(NetworkUtils.getServerHostName(host)), locatorPort,
+ .getByName(NetworkUtils.getServerHostName(host)), locatorPort,
new QueueConnectionRequest(null, 2,
Collections.EMPTY_SET, null, false), 10000, true);
Assert.assertEquals(Collections.singletonList(expectedLocation), response2.getServers());
-
+
response2 = (QueueConnectionResponse) TcpClient
.requestToServer(InetAddress.getByName(NetworkUtils.getServerHostName(host)),
locatorPort, new QueueConnectionRequest(null, 5, Collections.EMPTY_SET, null,
false), 10000, true);
-
+
Assert.assertEquals(Collections.singletonList(expectedLocation), response2.getServers());
//we expect that the queue load will increase by 2
expectedLoad.setSubscriptionConnectionLoad(2f);
- vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected));
+ checkLocatorLoad(vm0, expected);
}
-
+
/**
* Test to make sure the bridge servers communicate
* their updated load to the controller when the load
* on the bridge server changes.
- *
- * @throws Exception
+ * @throws Exception
*/
public void testLoadMessaging() throws Exception {
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
-
+
int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
- vm0.invoke("Start Locator", () -> startLocator(vm0.getHost(), locatorPort, ""));
+ startLocatorInVM(vm0, locatorPort, "");
String locators = getLocatorString(host, locatorPort);
-
- final int serverPort = vm1.invoke("Start BridgeServer", () -> startBridgeServer(new String[] { "a", "b" }, locators));
-
+
+ final int serverPort = startBridgeServerInVM(vm1, new String[] {"a", "b"}, locators);
+
//We expect 0 load
Map expected = new HashMap();
ServerLocation expectedLocation = new ServerLocation(NetworkUtils.getServerHostName(host), serverPort);
ServerLoad expectedLoad = new ServerLoad(0f, 1 / 800.0f, 0f, 1f);
expected.put(expectedLocation, expectedLoad);
- vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected));
-
+ checkLocatorLoad(vm0, expected);
+
PoolFactoryImpl pf = new PoolFactoryImpl(null);
pf.addServer(NetworkUtils.getServerHostName(host), serverPort);
pf.setMinConnections(8);
pf.setMaxConnections(8);
pf.setSubscriptionEnabled(true);
- vm2.invoke("StartBridgeClient", () -> startBridgeClient(pf.getPoolAttributes(), new String[] { REGION_NAME }));
-
+ startBridgeClientInVM(vm2, pf.getPoolAttributes(), new String[] {REGION_NAME});
+
//We expect 8 client to server connections. The queue requires
//an additional client to server connection, but that shouldn't show up here.
- expectedLoad = new ServerLoad(8 / 800f, 1 / 800.0f, 1f, 1f);
+ expectedLoad = new ServerLoad(8/800f, 1 / 800.0f, 1f, 1f);
expected.put(expectedLocation, expectedLoad);
-
- vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected));
-
+
+
+ checkLocatorLoad(vm0, expected);
+
stopBridgeMemberVM(vm2);
-
+
//Now we expect 0 load
expectedLoad = new ServerLoad(0f, 1 / 800.0f, 0f, 1f);
expected.put(expectedLocation, expectedLoad);
- vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected));
+ checkLocatorLoad(vm0, expected);
}
-
+
/**
* Test to make sure that the locator
* balancing load between two servers.
- *
- * @throws Exception
+ * @throws Exception
*/
public void testBalancing() throws Exception {
final Host host = Host.getHost(0);
@@ -219,60 +228,87 @@ public class LocatorLoadBalancingDUnitTest extends LocatorTestBase {
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
-
+
int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
- vm0.invoke("Start Locator", () -> startLocator(vm0.getHost(), locatorPort, ""));
+ startLocatorInVM(vm0, locatorPort, "");
String locators = getLocatorString(host, locatorPort);
-
- vm1.invoke("Start BridgeServer", () -> startBridgeServer(new String[] { "a", "b" }, locators));
- vm2.invoke("Start BridgeServer", () -> startBridgeServer(new String[] { "a", "b" }, locators));
-
+
+ startBridgeServerInVM(vm1, new String[] {"a", "b"}, locators);
+ startBridgeServerInVM(vm2, new String[] {"a", "b"}, locators);
+
PoolFactoryImpl pf = new PoolFactoryImpl(null);
pf.addLocator(NetworkUtils.getServerHostName(host), locatorPort);
pf.setMinConnections(80);
pf.setMaxConnections(80);
pf.setSubscriptionEnabled(false);
pf.setIdleTimeout(-1);
- vm3.invoke("StartBridgeClient", () -> startBridgeClient(pf.getPoolAttributes(), new String[] { REGION_NAME }));
-
- vm3.invoke("waitForPrefilledConnections", () -> waitForPrefilledConnections(80));
-
- vm1.invoke("check connection count", () -> checkConnectionCount(40));
- vm2.invoke("check connection count", () -> checkConnectionCount(40));
+ startBridgeClientInVM(vm3, pf.getPoolAttributes(), new String[] {REGION_NAME});
+
+ waitForPrefilledConnections(vm3, 80);
+
+ checkConnectionCount(vm1, 40);
+ checkConnectionCount(vm2, 40);
}
- private void checkConnectionCount(final int count) {
- Cache cache = (Cache) remoteObjects.get(CACHE_KEY);
- final CacheServerImpl server = (CacheServerImpl)
- cache.getCacheServers().get(0);
- Awaitility.await().pollDelay(100, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS)
- .timeout(300, TimeUnit.SECONDS).until(() -> {
- int sz = server.getAcceptor().getStats().getCurrentClientConnections();
- if (Math.abs(sz - count) <= ALLOWABLE_ERROR_IN_COUNT) {
- return true;
+ private void checkConnectionCount(VM vm, final int count) {
+ SerializableRunnableIF checkConnectionCount = new SerializableRunnable("checkConnectionCount") {
+ public void run() {
+ Cache cache = (Cache) remoteObjects.get(CACHE_KEY);
+ final CacheServerImpl server = (CacheServerImpl)
+ cache.getCacheServers().get(0);
+ WaitCriterion wc = new WaitCriterion() {
+ String excuse;
+ public boolean done() {
+ int sz = server.getAcceptor().getStats()
+ .getCurrentClientConnections();
+ if (Math.abs(sz - count) <= ALLOWABLE_ERROR_IN_COUNT) {
+ return true;
+ }
+ excuse = "Found " + sz + " connections, expected " + count;
+ return false;
+ }
+ public String description() {
+ return excuse;
+ }
+ };
+ Wait.waitForCriterion(wc, 5 * 60 * 1000, 1000, true);
}
- System.out.println("Found " + sz + " connections, expected " + count);
- return false;
- });
+ };
+
+ vm.invoke(checkConnectionCount);
}
-
- private void waitForPrefilledConnections(final int count) throws Exception {
- waitForPrefilledConnections(count, POOL_NAME);
+
+ private void waitForPrefilledConnections(VM vm, final int count) throws Exception {
+ waitForPrefilledConnections(vm, count, POOL_NAME);
}
- private void waitForPrefilledConnections(final int count, final String poolName) throws Exception {
- final PoolImpl pool = (PoolImpl) PoolManager.getAll().get(poolName);
- Awaitility.await().pollDelay(100, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS)
- .timeout(300, TimeUnit.SECONDS).until(() -> pool.getConnectionCount() >= count);
+ private void waitForPrefilledConnections(VM vm, final int count, final String poolName) throws Exception {
+ SerializableRunnable runnable = new SerializableRunnable("waitForPrefilledConnections") {
+ public void run() {
+ final PoolImpl pool = (PoolImpl) PoolManager.getAll().get(poolName);
+ WaitCriterion ev = new WaitCriterion() {
+ public boolean done() {
+ return pool.getConnectionCount() >= count;
+ }
+ public String description() {
+ return "connection count never reached " + count;
+ }
+ };
+ Wait.waitForCriterion(ev, MAX_WAIT, 200, true);
+ }
+ };
+ if(vm == null) {
+ runnable.run();
+ } else {
+ vm.invoke(runnable);
+ }
}
-
- /**
- * Test that the locator balances load between
+
+ /** Test that the locator balances load between
* three servers with intersecting server groups.
* Server: 1 2 3
* Groups: a a,b b
- *
- * @throws Exception
+ * @throws Exception
*/
public void testIntersectingServerGroups() throws Exception {
final Host host = Host.getHost(0);
@@ -280,158 +316,175 @@ public class LocatorLoadBalancingDUnitTest extends LocatorTestBase {
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
-
+
int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
- vm0.invoke("Start Locator", () -> startLocator(vm0.getHost(), locatorPort, ""));
+ startLocatorInVM(vm0, locatorPort, "");
String locators = getLocatorString(host, locatorPort);
-
- int serverPort1 = vm1.invoke("Start BridgeServer", () -> startBridgeServer(new String[] { "a" }, locators));
- vm2.invoke("Start BridgeServer", () -> startBridgeServer(new String[] { "a", "b" }, locators));
- vm3.invoke("Start BridgeServer", () -> startBridgeServer(new String[] { "b" }, locators));
-
+
+ int serverPort1 = startBridgeServerInVM(vm1, new String[] {"a"}, locators);
+ startBridgeServerInVM(vm2, new String[] {"a", "b"}, locators);
+ startBridgeServerInVM(vm3, new String[] {"b"}, locators);
+
PoolFactoryImpl pf = new PoolFactoryImpl(null);
pf.addLocator(NetworkUtils.getServerHostName(host), locatorPort);
pf.setMinConnections(12);
pf.setSubscriptionEnabled(false);
pf.setServerGroup("a");
pf.setIdleTimeout(-1);
- startBridgeClient(pf.getPoolAttributes(), new String[] { REGION_NAME });
- waitForPrefilledConnections(12);
-
- vm1.invoke("Check Connection Count", () -> checkConnectionCount(6));
- vm2.invoke("Check Connection Count", () -> checkConnectionCount(6));
- vm3.invoke("Check Connection Count", () -> checkConnectionCount(0));
-
+ startBridgeClientInVM(null, pf.getPoolAttributes(), new String[] {REGION_NAME});
+ waitForPrefilledConnections(null, 12);
+
+ checkConnectionCount(vm1, 6);
+ checkConnectionCount(vm2, 6);
+ checkConnectionCount(vm3, 0);
+
LogWriterUtils.getLogWriter().info("pool1 prefilled");
-
+
PoolFactoryImpl pf2 = (PoolFactoryImpl) PoolManager.createFactory();
pf2.init(pf.getPoolAttributes());
pf2.setServerGroup("b");
- PoolImpl pool2 = (PoolImpl) pf2.create("testPool2");
- waitForPrefilledConnections(12, "testPool2");
+ PoolImpl pool2= (PoolImpl) pf2.create("testPool2");
+ waitForPrefilledConnections(null, 12, "testPool2");
// The load will not be perfect, because we created all of the connections
//for group A first.
- vm1.invoke("Check Connection Count", () -> checkConnectionCount(6));
- vm2.invoke("Check Connection Count", () -> checkConnectionCount(9));
- vm3.invoke("Check Connection Count", () -> checkConnectionCount(9));
-
+ checkConnectionCount(vm1, 6);
+ checkConnectionCount(vm2, 9);
+ checkConnectionCount(vm3, 9);
+
LogWriterUtils.getLogWriter().info("pool2 prefilled");
-
+
ServerLocation location1 = new ServerLocation(NetworkUtils.getServerHostName(host), serverPort1);
PoolImpl pool1 = (PoolImpl) PoolManager.getAll().get(POOL_NAME);
Assert.assertEquals("a", pool1.getServerGroup());
-
+
//Use up all of the pooled connections on pool1, and acquire 3 more
- for (int i = 0; i < 15; i++) {
+ for(int i = 0; i < 15; i++) {
pool1.acquireConnection();
}
-
+
LogWriterUtils.getLogWriter().info("aquired 15 connections in pool1");
-
+
//now the load should be equal
- vm1.invoke("Check Connection Count", () -> checkConnectionCount(9));
- vm2.invoke("Check Connection Count", () -> checkConnectionCount(9));
- vm3.invoke("Check Connection Count", () -> checkConnectionCount(9));
-
+ checkConnectionCount(vm1, 9);
+ checkConnectionCount(vm2, 9);
+ checkConnectionCount(vm3, 9);
+
//use up all of the pooled connections on pool2
- for (int i = 0; i < 12; i++) {
+ for(int i = 0; i < 12; i++) {
pool2.acquireConnection();
}
-
+
LogWriterUtils.getLogWriter().info("aquired 12 connections in pool2");
-
+
//interleave creating connections in both pools
- for (int i = 0; i < 6; i++) {
+ for(int i = 0; i < 6; i++) {
pool1.acquireConnection();
pool2.acquireConnection();
}
-
+
LogWriterUtils.getLogWriter().info("interleaved 6 connections from pool1 with 6 connections from pool2");
-
+
//The load should still be balanced
- vm1.invoke("Check Connection Count", () -> checkConnectionCount(13));
- vm2.invoke("Check Connection Count", () -> checkConnectionCount(13));
- vm3.invoke("Check Connection Count", () -> checkConnectionCount(13));
-
+ checkConnectionCount(vm1, 13);
+ checkConnectionCount(vm2, 13);
+ checkConnectionCount(vm3, 13);
+
}
-
+
public void testCustomLoadProbe() throws Exception {
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
- // VM vm3 = host.getVM(3);
-
+// VM vm3 = host.getVM(3);
+
int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
- vm0.invoke("Start Locator", () -> startLocator(vm0.getHost(), locatorPort, ""));
+ startLocatorInVM(vm0, locatorPort, "");
String locators = getLocatorString(host, locatorPort);
-
- final ServerLoad load1 = new ServerLoad(.3f, .01f, .44f, 4564f);
- final ServerLoad load2 = new ServerLoad(23.2f, 1.1f, 22.3f, .3f);
- int serverPort1 = vm1.invoke("Start BridgeServer", () -> startBridgeServer(null, locators, new String[] { REGION_NAME }, new MyLoadProbe(load1)));
- int serverPort2 = vm2.invoke("Start BridgeServer", () -> startBridgeServer(null, locators, new String[] { REGION_NAME }, new MyLoadProbe(load2)));
-
+
+ ServerLoad load1= new ServerLoad(.3f, .01f, .44f, 4564f);
+ ServerLoad load2= new ServerLoad(23.2f, 1.1f, 22.3f, .3f);
+ int serverPort1 = startBridgeServerInVM(vm1, null, locators, new String[] {REGION_NAME}, new MyLoadProbe(load1 ));
+ int serverPort2 = startBridgeServerInVM(vm2, null, locators, new String[] {REGION_NAME}, new MyLoadProbe(load2 ));
+
HashMap expected = new HashMap();
ServerLocation l1 = new ServerLocation(NetworkUtils.getServerHostName(host), serverPort1);
ServerLocation l2 = new ServerLocation(NetworkUtils.getServerHostName(host), serverPort2);
expected.put(l1, load1);
expected.put(l2, load2);
- vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected));
-
+ checkLocatorLoad(vm0, expected);
+
load1.setConnectionLoad(25f);
- vm1.invoke("changeLoad", () -> changeLoad(load1));
+ changeLoad(vm1, load1);
load2.setSubscriptionConnectionLoad(3.5f);
- vm2.invoke("changeLoad", () -> changeLoad(load2));
- vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected));
-
- final ServerLoad load1Updated = new ServerLoad(1f, .1f, 0f, 1f);
- final ServerLoad load2Updated = new ServerLoad(2f, 5f, 0f, 2f);
- expected.put(l1, load1Updated);
- expected.put(l2, load2Updated);
- vm1.invoke("changeLoad", () -> changeLoad(load1Updated));
- vm2.invoke("changeLoad", () -> changeLoad(load2Updated));
- vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected));
-
+ changeLoad(vm2, load2);
+ checkLocatorLoad(vm0, expected);
+
+ load1 = new ServerLoad(1f, .1f, 0f, 1f);
+ load2 = new ServerLoad(2f, 5f, 0f, 2f);
+ expected.put(l1, load1);
+ expected.put(l2, load2);
+ changeLoad(vm1, load1);
+ changeLoad(vm2, load2);
+ checkLocatorLoad(vm0, expected);
+
PoolFactoryImpl pf = new PoolFactoryImpl(null);
pf.addLocator(NetworkUtils.getServerHostName(host), locatorPort);
pf.setMinConnections(20);
pf.setSubscriptionEnabled(true);
pf.setIdleTimeout(-1);
- startBridgeClient(pf.getPoolAttributes(), new String[] { REGION_NAME });
- waitForPrefilledConnections(20);
-
+ startBridgeClientInVM(null, pf.getPoolAttributes(), new String[] {REGION_NAME});
+ waitForPrefilledConnections(null, 20);
+
//The first 10 connection should to go vm1, then 1 to vm2, then another 9 to vm1
//because have unequal values for loadPerConnection
- vm1.invoke("Check Connection Count", () -> checkConnectionCount(19));
- vm2.invoke("Check Connection Count", () -> checkConnectionCount(1));
+ checkConnectionCount(vm1, 19);
+ checkConnectionCount(vm2, 1);
}
-
- public void checkLocatorLoad(final Map expected) {
- List locators = Locator.getLocators();
- Assert.assertEquals(1, locators.size());
- InternalLocator locator = (InternalLocator) locators.get(0);
- final ServerLocator sl = locator.getServerLocatorAdvisee();
- InternalLogWriter log = new LocalLogWriter(InternalLogWriter.FINEST_LEVEL, System.out);
- sl.getDistributionAdvisor().dumpProfiles("PROFILES= ");
- Awaitility.await().pollDelay(100, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS)
- .timeout(300, TimeUnit.SECONDS).until(() -> expected.equals(sl.getLoadMap()));
+
+ public void checkLocatorLoad(VM vm, final Map expected) {
+ vm.invoke(new SerializableRunnable() {
+ public void run() {
+ List locators = Locator.getLocators();
+ Assert.assertEquals(1, locators.size());
+ InternalLocator locator = (InternalLocator) locators.get(0);
+ final ServerLocator sl = locator.getServerLocatorAdvisee();
+ InternalLogWriter log = new LocalLogWriter(InternalLogWriter.FINEST_LEVEL, System.out);
+ sl.getDistributionAdvisor().dumpProfiles("PROFILES= ");
+ WaitCriterion ev = new WaitCriterion() {
+ public boolean done() {
+ return expected.equals(sl.getLoadMap());
+ }
+ public String description() {
+ return "load map never became equal to " + expected;
+ }
+ };
+ Wait.waitForCriterion(ev, MAX_WAIT, 200, true);
+ }
+ });
}
-
- private void changeLoad(final ServerLoad newLoad) {
- Cache cache = (Cache) remoteObjects.get(CACHE_KEY);
- CacheServer server = cache.getCacheServers().get(0);
- MyLoadProbe probe = (MyLoadProbe) server.getLoadProbe();
- probe.setLoad(newLoad);
+
+ private void changeLoad(VM vm, final ServerLoad newLoad) {
+ vm.invoke(new SerializableRunnable() {
+
+ public void run() {
+ Cache cache = (Cache) remoteObjects.get(CACHE_KEY);
+ CacheServer server = (CacheServer) cache.getCacheServers().get(0);
+ MyLoadProbe probe = (MyLoadProbe) server.getLoadProbe();
+ probe.setLoad(newLoad);
+ }
+
+ });
}
-
+
private static class MyLoadProbe extends ServerLoadProbeAdapter implements Serializable {
private ServerLoad load;
-
+
public MyLoadProbe(ServerLoad load) {
this.load = load;
}
-
+
public ServerLoad getLoad(ServerMetrics metrics) {
float connectionLoad = load.getConnectionLoad()
+ metrics.getConnectionCount() * load.getLoadPerConnection();
@@ -440,7 +493,7 @@ public class LocatorLoadBalancingDUnitTest extends LocatorTestBase {
return new ServerLoad(connectionLoad, load.getLoadPerConnection(),
queueLoad, load.getLoadPerSubscriptionConnection());
}
-
+
public void setLoad(ServerLoad load) {
this.load = load;
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d1a0748b/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorTestBase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorTestBase.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorTestBase.java
index af5ba9c..2207e1d 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorTestBase.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorTestBase.java
@@ -16,35 +16,52 @@
*/
package com.gemstone.gemfire.cache.client.internal;
-import com.gemstone.gemfire.cache.*;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache.client.Pool;
import com.gemstone.gemfire.cache.client.PoolManager;
-import com.gemstone.gemfire.cache.server.CacheServer;
import com.gemstone.gemfire.cache.server.ServerLoadProbe;
+import com.gemstone.gemfire.cache.server.CacheServer;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.Locator;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.internal.AvailablePortHelper;
import com.gemstone.gemfire.internal.cache.PoolFactoryImpl;
-import com.gemstone.gemfire.test.dunit.*;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.util.*;
+import com.gemstone.gemfire.test.dunit.Assert;
+import com.gemstone.gemfire.test.dunit.DistributedTestCase;
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.Invoke;
+import com.gemstone.gemfire.test.dunit.LogWriterUtils;
+import com.gemstone.gemfire.test.dunit.NetworkUtils;
+import com.gemstone.gemfire.test.dunit.SerializableCallable;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+import com.gemstone.gemfire.test.dunit.VM;
/**
*
*/
-public abstract class LocatorTestBase extends DistributedTestCase {
+public abstract class LocatorTestBase extends DistributedTestCase {
protected static final String CACHE_KEY = "CACHE";
protected static final String LOCATOR_KEY = "LOCATOR";
protected static final String REGION_NAME = "A_REGION";
protected static final String POOL_NAME = "daPool";
protected static final Object CALLBACK_KEY = "callback";
- /**
- * A map for storing temporary objects in a remote VM so that they can be used
+ /** A map for storing temporary objects in a remote VM so that they can be used
* between calls. Cleared after each test.
*/
protected static final HashMap remoteObjects = new HashMap();
@@ -52,211 +69,264 @@ public abstract class LocatorTestBase extends DistributedTestCase {
public LocatorTestBase(String name) {
super(name);
}
-
+
@Override
public final void preTearDown() throws Exception {
-
+
SerializableRunnable tearDown = new SerializableRunnable("tearDown") {
public void run() {
Locator locator = (Locator) remoteObjects.get(LOCATOR_KEY);
- if (locator != null) {
+ if(locator != null) {
try {
locator.stop();
- } catch (Exception e) {
+ } catch(Exception e) {
//do nothing
}
}
-
+
Cache cache = (Cache) remoteObjects.get(CACHE_KEY);
- if (cache != null) {
+ if(cache != null) {
try {
cache.close();
- } catch (Exception e) {
+ } catch(Exception e) {
//do nothing
}
}
remoteObjects.clear();
-
+
}
};
//We seem to like leaving the DS open if we can for
//speed, but lets at least destroy our cache and locator.
Invoke.invokeInEveryVM(tearDown);
tearDown.run();
-
+
postTearDownLocatorTestBase();
}
-
+
protected void postTearDownLocatorTestBase() throws Exception {
}
+
+ protected void startLocatorInVM(final VM vm, final int locatorPort, final String otherLocators) {
+ vm.invoke(new SerializableRunnable("Create Locator") {
- protected void startLocator(final Host vmHost, final int locatorPort, final String otherLocators) {
- disconnectFromDS();
- Properties props = new Properties();
- props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0));
- props.setProperty(DistributionConfig.LOCATORS_NAME, otherLocators);
- props.setProperty(DistributionConfig.LOG_LEVEL_NAME, LogWriterUtils.getDUnitLogLevel());
- props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
- File logFile = new File(getUniqueName() + "-locator" + locatorPort + ".log");
- try {
- InetAddress bindAddress = InetAddress.getByName(NetworkUtils.getServerHostName(vmHost));
- Locator locator = Locator.startLocatorAndDS(locatorPort, logFile, bindAddress, props);
- remoteObjects.put(LOCATOR_KEY, locator);
- } catch (UnknownHostException uhe) {
- Assert.fail("While resolving bind address ", uhe);
- } catch (IOException ex) {
- Assert.fail("While starting locator on port " + locatorPort, ex);
- }
+ final String testName= getUniqueName();
+ public void run() {
+ disconnectFromDS();
+ Properties props = new Properties();
+ props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0));
+ props.setProperty(DistributionConfig.LOCATORS_NAME, otherLocators);
+ props.setProperty(DistributionConfig.LOG_LEVEL_NAME, LogWriterUtils.getDUnitLogLevel());
+ props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
+ try {
+ File logFile = new File(testName + "-locator" + locatorPort
+ + ".log");
+ InetAddress bindAddr = null;
+ try {
+ bindAddr = InetAddress.getByName(NetworkUtils.getServerHostName(vm.getHost()));
+ } catch (UnknownHostException uhe) {
+ Assert.fail("While resolving bind address ", uhe);
+ }
+ Locator locator = Locator.startLocatorAndDS(locatorPort, logFile, bindAddr, props);
+ remoteObjects.put(LOCATOR_KEY, locator);
+ } catch (IOException ex) {
+ Assert.fail("While starting locator on port " + locatorPort, ex);
+ }
+ }
+ });
}
-
- protected void stopLocator() {
- Locator locator = (Locator) remoteObjects.remove(LOCATOR_KEY);
- locator.stop();
+
+
+
+ protected void stopLocatorInVM(VM vm) {
+ vm.invoke(new SerializableRunnable("Stop Locator") {
+ public void run() {
+ Locator locator = (Locator) remoteObjects.remove(LOCATOR_KEY);
+ locator.stop();
+ }
+ });
}
-
- protected int startBridgeServer(String[] groups, String locators) throws IOException {
- return startBridgeServer(groups, locators, new String[] { REGION_NAME });
+
+ protected int startBridgeServerInVM(VM vm, String[] groups, String locators) {
+ return startBridgeServerInVM(vm, groups, locators, new String[] {REGION_NAME});
}
+
+ protected int addCacheServerInVM(VM vm, final String[] groups) {
+ SerializableCallable connect =
+ new SerializableCallable("Add Bridge server") {
- protected int addCacheServer(final String[] groups) throws IOException {
- Cache cache = (Cache) remoteObjects.get(CACHE_KEY);
- CacheServer server = cache.addCacheServer();
- server.setPort(0);
- server.setGroups(groups);
- server.start();
- return new Integer(server.getPort());
+ public Object call() throws Exception {
+ Cache cache = (Cache) remoteObjects.get(CACHE_KEY);
+ CacheServer server = cache.addCacheServer();
+ final int serverPort = AvailablePortHelper.getRandomAvailableTCPPort();
+ server.setPort(serverPort);
+ server.setGroups(groups);
+ server.start();
+ return new Integer(serverPort);
+ }
+ };
+ Integer port = (Integer) vm.invoke(connect);
+ return port.intValue();
}
-
- protected int startBridgeServer(final String[] groups, final String locators, final String[] regions) throws IOException {
- return startBridgeServer(groups, locators, regions, CacheServer.DEFAULT_LOAD_PROBE);
+
+ protected int startBridgeServerInVM(VM vm, final String[] groups, final String locators, final String[] regions) {
+ return startBridgeServerInVM(vm, groups, locators, regions, CacheServer.DEFAULT_LOAD_PROBE);
}
-
- protected int startBridgeServer(final String[] groups, final String locators, final String[] regions, final ServerLoadProbe probe) throws IOException {
- Properties props = new Properties();
- props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0));
- props.setProperty(DistributionConfig.LOCATORS_NAME, locators);
- DistributedSystem ds = getSystem(props);
- Cache cache = CacheFactory.create(ds);
- AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.DISTRIBUTED_ACK);
- factory.setEnableBridgeConflation(true);
- factory.setDataPolicy(DataPolicy.REPLICATE);
- RegionAttributes attributes = factory.create();
- for (int i = 0; i < regions.length; i++) {
- cache.createRegion(regions[i], attributes);
- }
- CacheServer server = cache.addCacheServer();
- server.setPort(0);
- server.setGroups(groups);
- server.setLoadProbe(probe);
- server.start();
-
- remoteObjects.put(CACHE_KEY, cache);
-
- return new Integer(server.getPort());
+
+ protected int startBridgeServerInVM(VM vm, final String[] groups, final String locators, final String[] regions, final ServerLoadProbe probe) {
+ SerializableCallable connect =
+ new SerializableCallable("Start bridge server") {
+ public Object call() throws IOException {
+ Properties props = new Properties();
+ props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0));
+ props.setProperty(DistributionConfig.LOCATORS_NAME, locators);
+ DistributedSystem ds = getSystem(props);
+ Cache cache = CacheFactory.create(ds);
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.DISTRIBUTED_ACK);
+ factory.setEnableBridgeConflation(true);
+ factory.setDataPolicy(DataPolicy.REPLICATE);
+ RegionAttributes attrs = factory.create();
+ for(int i = 0; i < regions.length; i++) {
+ cache.createRegion(regions[i], attrs);
+ }
+ CacheServer server = cache.addCacheServer();
+ final int serverPort = AvailablePortHelper.getRandomAvailableTCPPort();
+ server.setPort(serverPort);
+ server.setGroups(groups);
+ server.setLoadProbe(probe);
+ server.start();
+
+ remoteObjects.put(CACHE_KEY, cache);
+
+ return new Integer(serverPort);
+ }
+ };
+ Integer port = (Integer) vm.invoke(connect);
+ return port.intValue();
}
-
- protected int startBridgeServerWithEmbeddedLocator(final String[] groups, final String locators, final String[] regions, final ServerLoadProbe probe)
- throws IOException {
- Properties props = new Properties();
- props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0));
- props.setProperty(DistributionConfig.START_LOCATOR_NAME, locators);
- props.setProperty(DistributionConfig.LOCATORS_NAME, locators);
- DistributedSystem ds = getSystem(props);
- Cache cache = CacheFactory.create(ds);
- AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.DISTRIBUTED_ACK);
- factory.setEnableBridgeConflation(true);
- factory.setDataPolicy(DataPolicy.REPLICATE);
- RegionAttributes attrs = factory.create();
- for (int i = 0; i < regions.length; i++) {
- cache.createRegion(regions[i], attrs);
- }
- CacheServer server = cache.addCacheServer();
- server.setGroups(groups);
- server.setLoadProbe(probe);
- server.setPort(0);
- server.start();
-
- remoteObjects.put(CACHE_KEY, cache);
-
- return new Integer(server.getPort());
+
+ protected int startBridgeServerWithEmbeddedLocator(VM vm, final String[] groups, final String locators, final String[] regions, final ServerLoadProbe probe) {
+ SerializableCallable connect =
+ new SerializableCallable("Start bridge server") {
+ public Object call() throws IOException {
+ Properties props = new Properties();
+ props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0));
+ props.setProperty(DistributionConfig.START_LOCATOR_NAME, locators);
+ props.setProperty(DistributionConfig.LOCATORS_NAME, locators);
+ DistributedSystem ds = getSystem(props);
+ Cache cache = CacheFactory.create(ds);
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.DISTRIBUTED_ACK);
+ factory.setEnableBridgeConflation(true);
+ factory.setDataPolicy(DataPolicy.REPLICATE);
+ RegionAttributes attrs = factory.create();
+ for(int i = 0; i < regions.length; i++) {
+ cache.createRegion(regions[i], attrs);
+ }
+ CacheServer server = cache.addCacheServer();
+ server.setGroups(groups);
+ server.setLoadProbe(probe);
+ final int serverPort = AvailablePortHelper.getRandomAvailableTCPPort();
+ server.setPort(serverPort);
+ server.start();
+
+ remoteObjects.put(CACHE_KEY, cache);
+
+ return new Integer(serverPort);
+ }
+ };
+ Integer port = (Integer) vm.invoke(connect);
+ return port.intValue();
}
-
- protected void startBridgeClient(final String group, final String host, final int port) throws Exception {
- startBridgeClient(group, host, port, new String[] { REGION_NAME });
+
+ protected void startBridgeClientInVM(VM vm, final String group, final String host, final int port) throws Exception {
+ startBridgeClientInVM(vm, group, host, port, new String[] {REGION_NAME});
}
+
- protected void startBridgeClient(final String group, final String host, final int port, final String[] regions) throws Exception {
+ protected void startBridgeClientInVM(VM vm, final String group, final String host, final int port, final String[] regions) throws Exception {
PoolFactoryImpl pf = new PoolFactoryImpl(null);
pf.addLocator(host, port)
- .setServerGroup(group)
- .setPingInterval(200)
- .setSubscriptionEnabled(true)
- .setSubscriptionRedundancy(-1);
- startBridgeClient(pf.getPoolAttributes(), regions);
+ .setServerGroup(group)
+ .setPingInterval(200)
+ .setSubscriptionEnabled(true)
+ .setSubscriptionRedundancy(-1);
+ startBridgeClientInVM(vm, pf.getPoolAttributes(), regions);
}
-
- protected void startBridgeClient(final Pool pool, final String[] regions) throws Exception {
- Properties props = new Properties();
- props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0));
- props.setProperty(DistributionConfig.LOCATORS_NAME, "");
- DistributedSystem ds = getSystem(props);
- Cache cache = CacheFactory.create(ds);
- AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.LOCAL);
- // factory.setEnableBridgeConflation(true);
- // factory.setDataPolicy(DataPolicy.NORMAL);
- factory.setPoolName(POOL_NAME);
- PoolFactoryImpl pf = (PoolFactoryImpl) PoolManager.createFactory();
- pf.init(pool);
- LocatorDiscoveryCallback locatorCallback = new MyLocatorCallback();
- remoteObjects.put(CALLBACK_KEY, locatorCallback);
- pf.setLocatorDiscoveryCallback(locatorCallback);
- pf.create(POOL_NAME);
-
- RegionAttributes attrs = factory.create();
- for (int i = 0; i < regions.length; i++) {
- cache.createRegion(regions[i], attrs);
+
+ protected void startBridgeClientInVM(VM vm, final Pool pool, final String[] regions) throws Exception {
+ SerializableRunnable connect =
+ new SerializableRunnable("Start bridge client") {
+ public void run() {
+ Properties props = new Properties();
+ props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0));
+ props.setProperty(DistributionConfig.LOCATORS_NAME, "");
+ DistributedSystem ds = getSystem(props);
+ Cache cache = CacheFactory.create(ds);
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+// factory.setEnableBridgeConflation(true);
+// factory.setDataPolicy(DataPolicy.NORMAL);
+ factory.setPoolName(POOL_NAME);
+ PoolFactoryImpl pf= (PoolFactoryImpl) PoolManager.createFactory();
+ pf.init(pool);
+ LocatorDiscoveryCallback locatorCallback = new MyLocatorCallback();
+ remoteObjects.put(CALLBACK_KEY, locatorCallback);
+ pf.setLocatorDiscoveryCallback(locatorCallback);
+ pf.create(POOL_NAME);
+
+
+ RegionAttributes attrs = factory.create();
+ for(int i = 0; i < regions.length; i++) {
+ cache.createRegion(regions[i], attrs);
+ }
+
+ remoteObjects.put(CACHE_KEY, cache);
+ }
+ };
+
+ if(vm == null) {
+ connect.run();
+ } else {
+ vm.invoke(connect);
}
-
- remoteObjects.put(CACHE_KEY, cache);
}
-
+
protected void stopBridgeMemberVM(VM vm) {
- vm.invoke(new SerializableRunnable("Stop bridge member") {
- public void run() {
- Cache cache = (Cache) remoteObjects.remove(CACHE_KEY);
- cache.close();
- disconnectFromDS();
- }
- });
+ vm.invoke(new SerializableRunnable("Stop bridge member") {
+ public void run() {
+ Cache cache = (Cache) remoteObjects.remove(CACHE_KEY);
+ cache.close();
+ disconnectFromDS();
+ }
+ });
}
-
+
public String getLocatorString(Host host, int locatorPort) {
- return getLocatorString(host, new int[] { locatorPort });
+ return getLocatorString(host, new int[] {locatorPort});
}
-
+
public String getLocatorString(Host host, int[] locatorPorts) {
StringBuffer str = new StringBuffer();
- for (int i = 0; i < locatorPorts.length; i++) {
+ for(int i = 0; i < locatorPorts.length; i++) {
str.append(NetworkUtils.getServerHostName(host))
.append("[")
.append(locatorPorts[i])
.append("]");
- if (i < locatorPorts.length - 1) {
+ if(i < locatorPorts.length - 1) {
str.append(",");
}
}
-
+
return str.toString();
}
-
+
protected static class MyLocatorCallback extends LocatorDiscoveryCallbackAdapter {
private final Set discoveredLocators = new HashSet();
private final Set removedLocators = new HashSet();
-
+
public synchronized void locatorsDiscovered(List locators) {
discoveredLocators.addAll(locators);
notifyAll();
@@ -266,29 +336,29 @@ public abstract class LocatorTestBase extends DistributedTestCase {
removedLocators.addAll(locators);
notifyAll();
}
-
+
public boolean waitForDiscovery(InetSocketAddress locator, long time) throws InterruptedException {
return waitFor(discoveredLocators, locator, time);
}
-
+
public boolean waitForRemove(InetSocketAddress locator, long time) throws InterruptedException {
return waitFor(removedLocators, locator, time);
}
-
+
private synchronized boolean waitFor(Set set, InetSocketAddress locator, long time) throws InterruptedException {
long remaining = time;
long endTime = System.currentTimeMillis() + time;
- while (!set.contains(locator) && remaining >= 0) {
+ while(!set.contains(locator) && remaining >= 0) {
wait(remaining);
- remaining = endTime - System.currentTimeMillis();
+ remaining = endTime - System.currentTimeMillis();
}
return set.contains(locator);
}
-
+
public synchronized Set getDiscovered() {
return new HashSet(discoveredLocators);
}
-
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d1a0748b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/Bug47667DUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/Bug47667DUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/Bug47667DUnitTest.java
index 0f08456..de43c29 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/Bug47667DUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/Bug47667DUnitTest.java
@@ -56,11 +56,11 @@ public class Bug47667DUnitTest extends LocatorTestBase {
final int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
final String locatorHost = NetworkUtils.getServerHostName(host);
- locator.invoke("Start Locator",() ->startLocator(locator.getHost(), locatorPort, ""));
+ startLocatorInVM(locator, locatorPort, "");
String locString = getLocatorString(host, locatorPort);
- server1.invoke("Start BridgeServer", () -> startBridgeServer(new String[] {"R1"}, locString, new String[] {"R1"}));
- server2.invoke("Start BridgeServer", () -> startBridgeServer(new String[] {"R2"}, locString, new String[] {"R2"}));
+ startBridgeServerInVM(server1, new String[] {"R1"}, locString, new String[] {"R1"});
+ startBridgeServerInVM(server2, new String[] {"R2"}, locString, new String[] {"R2"});
client.invoke(new SerializableCallable() {
@Override
@@ -70,15 +70,15 @@ public class Bug47667DUnitTest extends LocatorTestBase {
ClientCache cache = ccf.create();
PoolManager.createFactory().addLocator(locatorHost, locatorPort).setServerGroup("R1").create("R1");
PoolManager.createFactory().addLocator(locatorHost, locatorPort).setServerGroup("R2").create("R2");
- Region region1 = cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).setPoolName("R1").create("R1");
- Region region2 = cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).setPoolName("R2").create("R2");
- CacheTransactionManager transactionManager = cache.getCacheTransactionManager();
- transactionManager.begin();
- region1.put(1, "value1");
- transactionManager.commit();
- transactionManager.begin();
- region2.put(2, "value2");
- transactionManager.commit();
+ Region r1 = cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).setPoolName("R1").create("R1");
+ Region r2 = cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).setPoolName("R2").create("R2");
+ CacheTransactionManager mgr = cache.getCacheTransactionManager();
+ mgr.begin();
+ r1.put(1, "value1");
+ mgr.commit();
+ mgr.begin();
+ r2.put(2, "value2");
+ mgr.commit();
return null;
}
});
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d1a0748b/geode-cq/src/test/java/com/gemstone/gemfire/management/CacheServerManagementDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/test/java/com/gemstone/gemfire/management/CacheServerManagementDUnitTest.java b/geode-cq/src/test/java/com/gemstone/gemfire/management/CacheServerManagementDUnitTest.java
index 167cc3a..d784397 100644
--- a/geode-cq/src/test/java/com/gemstone/gemfire/management/CacheServerManagementDUnitTest.java
+++ b/geode-cq/src/test/java/com/gemstone/gemfire/management/CacheServerManagementDUnitTest.java
@@ -16,9 +16,26 @@
*/
package com.gemstone.gemfire.management;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.Properties;
+
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanServer;
+import javax.management.Notification;
+import javax.management.NotificationListener;
+import javax.management.ObjectName;
+
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.client.internal.LocatorTestBase;
-import com.gemstone.gemfire.cache.query.*;
+import com.gemstone.gemfire.cache.query.IndexExistsException;
+import com.gemstone.gemfire.cache.query.IndexInvalidException;
+import com.gemstone.gemfire.cache.query.IndexNameConflictException;
+import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.cache.query.RegionNotFoundException;
import com.gemstone.gemfire.cache.query.cq.dunit.CqQueryDUnitTest;
import com.gemstone.gemfire.cache.query.internal.cq.CqService;
import com.gemstone.gemfire.cache.server.CacheServer;
@@ -29,34 +46,37 @@ import com.gemstone.gemfire.internal.AvailablePort;
import com.gemstone.gemfire.internal.AvailablePortHelper;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.management.internal.JmxManagerLocatorRequest;
+import com.gemstone.gemfire.management.internal.JmxManagerLocatorResponse;
import com.gemstone.gemfire.management.internal.MBeanJMXAdapter;
import com.gemstone.gemfire.management.internal.SystemManagementService;
-import com.gemstone.gemfire.test.dunit.*;
-
-import javax.management.*;
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Collections;
-import java.util.Properties;
+import com.gemstone.gemfire.test.dunit.Assert;
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.LogWriterUtils;
+import com.gemstone.gemfire.test.dunit.NetworkUtils;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+import com.gemstone.gemfire.test.dunit.VM;
+import com.gemstone.gemfire.test.dunit.Wait;
+import com.gemstone.gemfire.test.dunit.WaitCriterion;
/**
* Cache Server related management test cases
+ *
+ *
*/
public class CacheServerManagementDUnitTest extends LocatorTestBase {
private static final long serialVersionUID = 1L;
-
- private static int CONNECT_LOCATOR_TIMEOUT_MS = 30000;
+
+ private static int CONNECT_LOCATOR_TIMEOUT_MS = 30000;
private ManagementTestBase helper;
private static final String queryName = "testClientWithFeederAndCQ_0";
private static final String indexName = "testIndex";
-
+
private static MBeanServer mbeanServer = MBeanJMXAdapter.mbeanServer;
+
protected CqQueryDUnitTest cqDUnitTest = new CqQueryDUnitTest(
"CqDataDUnitTest");
@@ -64,7 +84,7 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase {
public CacheServerManagementDUnitTest(String name) {
super(name);
this.helper = new ManagementTestBase(name);
-
+
}
@Override
@@ -78,6 +98,7 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase {
}
/**
+ *
* @throws Exception
*/
public void testCacheServerMBean() throws Exception {
@@ -91,11 +112,12 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase {
helper.startManagingNode(managingNode);
//helper.createCache(server);
int serverPort = AvailablePortHelper.getRandomAvailableTCPPort();
- cqDUnitTest.createServer(server, serverPort);
-
+ cqDUnitTest.createServer(server,serverPort);
+
+
DistributedMember member = helper.getMember(server);
-
- verifyCacheServer(server, serverPort);
+
+ verifyCacheServer(server,serverPort);
final int port = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(server.getHost());
@@ -123,10 +145,10 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase {
// Close.
Wait.pause(2000);
- checkNavigation(managingNode, member, serverPort);
- verifyIndex(server, serverPort);
+ checkNavigation(managingNode,member,serverPort);
+ verifyIndex(server,serverPort);
// This will test all CQs and will close the cq in its final step
- verifyCacheServerRemote(managingNode, member, serverPort);
+ verifyCacheServerRemote(managingNode, member,serverPort);
verifyClosedCQ(server);
@@ -140,30 +162,30 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase {
/**
* Test for client server connection related management artifacts
- * like notifications
- *
+ * like notifications
* @throws Exception
*/
-
+
public void testCacheClient() throws Exception {
-
+
final Host host = Host.getHost(0);
VM locator = host.getVM(0);
VM server = host.getVM(1);
VM client = host.getVM(2);
-
+
int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
- locator.invoke("Start Locator", () -> startLocator(locator.getHost(), locatorPort, ""));
-
- String locators = NetworkUtils.getServerHostName(locator.getHost()) + "[" + locatorPort + "]";
-
- int serverPort = server.invoke("Start BridgeServer", () -> startBridgeServer(null, locators));
-
- addClientNotifListener(server, serverPort);
+ startLocatorInVM(locator, locatorPort, "");
+
+ String locators = NetworkUtils.getServerHostName(locator.getHost())+ "[" + locatorPort + "]";
+
+
+ int serverPort = startBridgeServerInVM(server, null, locators);
+
+ addClientNotifListener(server,serverPort);
// Start a client and make sure that proper notification is received
- client.invoke("Start BridgeClient", () -> startBridgeClient(null, NetworkUtils.getServerHostName(locator.getHost()), locatorPort));
-
+ startBridgeClientInVM(client, null, NetworkUtils.getServerHostName(locator.getHost()), locatorPort);
+
//stop the client and make sure the bridge server notifies
stopBridgeMemberVM(client);
helper.closeCache(locator);
@@ -171,14 +193,13 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase {
helper.closeCache(client);
}
-
+
/**
* Intention of this test is to check if a node becomes manager after all the nodes are alive
* it should have all the information of all the members.
- * <p>
+ *
* Thats why used service.getLocalManager().runManagementTaskAdhoc() to make node
* ready for federation when manager node comes up
- *
* @throws Exception
*/
@@ -187,72 +208,96 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase {
final Host host = Host.getHost(0);
VM locator = host.getVM(0);
VM server = host.getVM(1);
-
+
//Step 1:
final int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
- locator.invoke("Start Locator", () -> startLocator(locator.getHost(), locatorPort, ""));
-
- String locators = NetworkUtils.getServerHostName(locator.getHost()) + "[" + locatorPort + "]";
+ startLocator(locator, locatorPort, "");
+ String locators = NetworkUtils.getServerHostName(locator.getHost())+ "[" + locatorPort + "]";
+
//Step 2:
- server.invoke("Start BridgeServer", () -> startBridgeServer(null, locators));
-
+ int serverPort = startBridgeServerInVM(server, null, locators);
+
//Step 3:
- server.invoke("Check Server", () -> {
- Cache cache = GemFireCacheImpl.getInstance();
- assertNotNull(cache);
- SystemManagementService service = (SystemManagementService) ManagementService
- .getExistingManagementService(cache);
- assertNotNull(service);
- assertFalse(service.isManager());
- assertNotNull(service.getMemberMXBean());
- service.getLocalManager().runManagementTaskAdhoc();
+ server.invoke(new SerializableRunnable("Check Server") {
+
+ public void run() {
+ Cache cache = GemFireCacheImpl.getInstance();
+ assertNotNull(cache);
+ SystemManagementService service = (SystemManagementService)ManagementService
+ .getExistingManagementService(cache);
+ assertNotNull(service);
+ assertFalse(service.isManager());
+ assertNotNull(service.getMemberMXBean());
+ service.getLocalManager().runManagementTaskAdhoc();
+
+
+ }
});
+
+ //Step 4:
+ JmxManagerLocatorResponse locRes = JmxManagerLocatorRequest.send(locator
+ .getHost().getHostName(), locatorPort, CONNECT_LOCATOR_TIMEOUT_MS, Collections.<String, String> emptyMap());
+
+ //Step 5:
+ locator.invoke(new SerializableRunnable("Check locator") {
- //Step 4:
- JmxManagerLocatorRequest.send(locator
- .getHost().getHostName(), locatorPort, CONNECT_LOCATOR_TIMEOUT_MS, Collections.<String, String>emptyMap());
-
- //Step 5:
- locator.invoke("Check locator", () -> {
- Cache cache = GemFireCacheImpl.getInstance();
- assertNotNull(cache);
- ManagementService service = ManagementService
- .getExistingManagementService(cache);
- assertNotNull(service);
- assertTrue(service.isManager());
- LocatorMXBean bean = service.getLocalLocatorMXBean();
- assertEquals(locatorPort, bean.getPort());
- DistributedSystemMXBean dsBean = service.getDistributedSystemMXBean();
-
- assertEquals(2, dsBean.listMemberObjectNames().length);
+ public void run() {
+ Cache cache = GemFireCacheImpl.getInstance();
+ assertNotNull(cache);
+ ManagementService service = ManagementService
+ .getExistingManagementService(cache);
+ assertNotNull(service);
+ assertTrue(service.isManager());
+ LocatorMXBean bean = service.getLocalLocatorMXBean();
+ assertEquals(locatorPort, bean.getPort());
+ DistributedSystemMXBean dsBean = service.getDistributedSystemMXBean();
+ ObjectName[] names = dsBean.listMemberObjectNames();
+
+ assertEquals(2,dsBean.listMemberObjectNames().length);
+
+ }
});
+
+
helper.closeCache(locator);
helper.closeCache(server);
-
+
+
}
+
+
+ protected void startLocator(final VM vm, final int locatorPort, final String otherLocators) {
+ vm.invoke(new SerializableRunnable("Create Locator") {
- protected void startLocator(Host vmHost, final int locatorPort, final String otherLocators) {
- disconnectFromDS();
- Properties props = new Properties();
- props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0));
- props.setProperty(DistributionConfig.LOCATORS_NAME, otherLocators);
- props.setProperty(DistributionConfig.LOG_LEVEL_NAME, LogWriterUtils.getDUnitLogLevel());
- props.setProperty(DistributionConfig.JMX_MANAGER_HTTP_PORT_NAME, "0");
- props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
- File logFile = new File(getUniqueName() + "-locator" + locatorPort + ".log");
- try {
- InetAddress bindAddr = InetAddress.getByName(NetworkUtils.getServerHostName(vmHost));
- Locator locator = Locator.startLocatorAndDS(locatorPort, logFile, bindAddr, props);
- remoteObjects.put(LOCATOR_KEY, locator);
- } catch (UnknownHostException uhe) {
- Assert.fail("While resolving bind address ", uhe);
- } catch (IOException ex) {
- Assert.fail("While starting locator on port " + locatorPort, ex);
- }
+ final String testName= getUniqueName();
+ public void run() {
+ disconnectFromDS();
+ Properties props = new Properties();
+ props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0));
+ props.setProperty(DistributionConfig.LOCATORS_NAME, otherLocators);
+ props.setProperty(DistributionConfig.LOG_LEVEL_NAME, LogWriterUtils.getDUnitLogLevel());
+ props.setProperty(DistributionConfig.JMX_MANAGER_HTTP_PORT_NAME, "0");
+ props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
+ try {
+ File logFile = new File(testName + "-locator" + locatorPort
+ + ".log");
+ InetAddress bindAddr = null;
+ try {
+ bindAddr = InetAddress.getByName(NetworkUtils.getServerHostName(vm.getHost()));
+ } catch (UnknownHostException uhe) {
+ Assert.fail("While resolving bind address ", uhe);
+ }
+ Locator locator = Locator.startLocatorAndDS(locatorPort, logFile, bindAddr, props);
+ remoteObjects.put(LOCATOR_KEY, locator);
+ } catch (IOException ex) {
+ Assert.fail("While starting locator on port " + locatorPort, ex);
+ }
+ }
+ });
}
-
+
protected void checkNavigation(final VM vm,
final DistributedMember cacheServerMember, final int serverPort) {
SerializableRunnable checkNavigation = new SerializableRunnable(
@@ -282,14 +327,14 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase {
};
vm.invoke(checkNavigation);
}
-
+
/**
* Verify the Cache Server details
- *
+ *
* @param vm
*/
@SuppressWarnings("serial")
- protected void addClientNotifListener(final VM vm, final int serverPort) throws Exception {
+ protected void addClientNotifListener(final VM vm , final int serverPort) throws Exception {
SerializableRunnable addClientNotifListener = new SerializableRunnable(
"Add Client Notif Listener") {
public void run() {
@@ -314,19 +359,18 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase {
TestCacheServerNotif nt = new TestCacheServerNotif();
try {
mbeanServer.addNotificationListener(MBeanJMXAdapter
- .getClientServiceMBeanName(serverPort, cache.getDistributedSystem().getMemberId()), nt, null, null);
+ .getClientServiceMBeanName(serverPort,cache.getDistributedSystem().getMemberId()), nt, null, null);
} catch (InstanceNotFoundException e) {
fail("Failed With Exception " + e);
}
-
+
}
};
vm.invoke(addClientNotifListener);
}
-
/**
* Verify the closed CQ which is closed from Managing Node
- *
+ *
* @param vm
*/
@SuppressWarnings("serial")
@@ -361,7 +405,7 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase {
bean.removeIndex(indexName);
} catch (Exception e) {
fail("Failed With Exception " + e);
-
+
}
assertEquals(bean.getIndexCount(), 0);
@@ -372,7 +416,7 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase {
/**
* Verify the closed CQ which is closed from Managing Node
- *
+ *
* @param vm
*/
@SuppressWarnings("serial")
@@ -390,9 +434,11 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase {
vm.invoke(verifyClosedCQ);
}
+
+
/**
* Verify the Cache Server details
- *
+ *
* @param vm
*/
@SuppressWarnings("serial")
@@ -450,7 +496,7 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase {
/**
* Verify the Cache Server details
- *
+ *
* @param vm
*/
@SuppressWarnings("serial")
@@ -476,14 +522,14 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase {
LogWriterUtils.getLogWriter().info(
"<ExpectedString> Active Query Count "
+ bean.getActiveCQCount() + "</ExpectedString> ");
-
+
LogWriterUtils.getLogWriter().info(
"<ExpectedString> Registered Query Count "
+ bean.getRegisteredCQCount() + "</ExpectedString> ");
- assertTrue(bean.showAllClientStats()[0].getClientCQCount() == 1);
- int numQueues = bean.getNumSubscriptions();
- assertEquals(numQueues, 1);
+ assertTrue(bean.showAllClientStats()[0].getClientCQCount() == 1);
+ int numQueues = bean.getNumSubscriptions();
+ assertEquals(numQueues, 1);
// test for client connection Count
/* @TODO */
@@ -506,9 +552,11 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase {
};
vm.invoke(verifyCacheServerRemote);
}
-
+
/**
* Notification handler
+ *
+ *
*/
private static class TestCacheServerNotif implements
NotificationListener {