You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2016/02/17 05:28:25 UTC
[5/7] incubator-geode git commit: GEODE-870: test case fixes.
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9cf7ea2c/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqQueryUsingPoolDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqQueryUsingPoolDUnitTest.java b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqQueryUsingPoolDUnitTest.java
index 1cb23a1..9c080f3 100644
--- a/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqQueryUsingPoolDUnitTest.java
+++ b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqQueryUsingPoolDUnitTest.java
@@ -81,25 +81,25 @@ import com.gemstone.gemfire.test.dunit.WaitCriterion;
* This class tests the ContiunousQuery mechanism in GemFire.
* It does so by creating a cache server with a cache and a pre-defined region and
* a data loader. The client creates the same region and attaches the connection pool.
- *
+ *
*
* @author anil
*/
public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
-
+
/** The port on which the bridge server was started in this VM */
private static int bridgeServerPort;
-
+
protected static int port = 0;
protected static int port2 = 0;
-
+
public static int noTest = -1;
-
+
public final String[] regions = new String[] {
"regionA",
"regionB"
};
-
+
private final static int CREATE = 0;
private final static int UPDATE = 1;
private final static int DESTROY = 2;
@@ -113,20 +113,20 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
static private final String WAIT_PROPERTY = "CqQueryTest.maxWaitTime";
static private final int WAIT_DEFAULT = (20 * 1000);
-
- public static final long MAX_TIME = Integer.getInteger(WAIT_PROPERTY,
+
+ public static final long MAX_TIME = Integer.getInteger(WAIT_PROPERTY,
WAIT_DEFAULT).intValue();
public final String[] cqs = new String [] {
//0 - Test for ">"
"SELECT ALL * FROM /root/" + regions[0] + " p where p.ID > 0",
-
+
//1 - Test for "=" and "and".
"SELECT ALL * FROM /root/" + regions[0] + " p where p.ID = 2 and p.status='active'",
-
+
//2 - Test for "<" and "and".
"SELECT ALL * FROM /root/" + regions[1] + " p where p.ID < 5 and p.status='active'",
-
+
// FOLLOWING CQS ARE NOT TESTED WITH VALUES; THEY ARE USED TO TEST PARSING LOGIC WITHIN CQ.
//3
"SELECT * FROM /root/" + regions[0] + " ;",
@@ -148,21 +148,21 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
"SELECT ALL * FROM /root/" + regions[0] + " p where p.ID > 0 and p.status='active'",
//11 - Test for "No Alias"
"SELECT ALL * FROM /root/" + regions[0] + " where ID > 0",
-
+
};
-
+
private String[] invalidCQs = new String [] {
// Test for ">"
"SELECT ALL * FROM /root/invalidRegion p where p.ID > 0"
};
-
+
public CqQueryUsingPoolDUnitTest(String name) {
super(name);
}
-
+
public void setUp() throws Exception {
super.setUp();
-
+
//We're seeing this on the server when the client
//disconnects.
IgnoredException.addIgnoredException("Connection reset");
@@ -178,14 +178,14 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
getSystem();
}
});
-
+
}
-
+
/* Returns Cache Server Port */
static int getCacheServerPort() {
return bridgeServerPort;
}
-
+
/* Create Cache Server */
public void createServer(VM server)
{
@@ -201,7 +201,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
MirrorType mirrorType = MirrorType.KEYS_VALUES;
createServer(server, thePort, eviction, mirrorType);
}
-
+
public void createServer(VM server, final int thePort, final boolean eviction, final MirrorType mirrorType)
{
SerializableRunnable createServer = new CacheSerializableRunnable(
@@ -223,7 +223,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
for (int i = 0; i < regions.length; i++) {
createRegion(regions[i], factory.createRegionAttributes());
}
-
+
try {
startBridgeServer(thePort, true);
}
@@ -231,13 +231,13 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
catch (Exception ex) {
Assert.fail("While starting CacheServer", ex);
}
-
+
}
};
server.invoke(createServer);
}
-
+
/**
* Create a bridge server with partitioned region.
* @param server VM where to create the bridge server.
@@ -255,7 +255,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
//AttributesFactory factory = new AttributesFactory();
//factory.setScope(Scope.DISTRIBUTED_ACK);
//factory.setMirrorType(MirrorType.KEYS_VALUES);
-
+
//int maxMem = 0;
AttributesFactory attr = new AttributesFactory();
//attr.setValueConstraint(valueConstraint);
@@ -265,7 +265,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
}
PartitionAttributes prAttr = paf.setTotalNumBuckets(197).setRedundantCopies(redundantCopies).create();
attr.setPartitionAttributes(prAttr);
-
+
assertFalse(getSystem().isLoner());
//assertTrue(getSystem().getDistributionManager().getOtherDistributionManagerIds().size() > 0);
for (int i = 0; i < regions.length; i++) {
@@ -278,14 +278,14 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
catch (Exception ex) {
Assert.fail("While starting CacheServer", ex);
}
-
+
}
};
server.invoke(createServer);
}
-
-
+
+
/* Close Cache Server */
public void closeServer(VM server) {
server.invoke(new SerializableRunnable("Close CacheServer") {
@@ -295,15 +295,15 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
}
});
}
-
+
/* Create Client */
public void createClient(VM client, final int serverPort, final String serverHost) {
int[] serverPorts = new int[] {serverPort};
- createClient(client, serverPorts, serverHost, null, null);
+ createClient(client, serverPorts, serverHost, null, null);
}
-
+
/* Create Client */
- public void createClient(VM client, final int[] serverPorts, final String serverHost, final String redundancyLevel,
+ public void createClient(VM client, final int[] serverPorts, final String serverHost, final String redundancyLevel,
final String poolName) {
SerializableRunnable createQService =
new CacheSerializableRunnable("Create Client") {
@@ -317,10 +317,10 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
} catch (Exception cqe) {
Assert.fail("Failed to getCQService.", cqe);
}
-
+
AttributesFactory regionFactory = new AttributesFactory();
regionFactory.setScope(Scope.LOCAL);
-
+
if (poolName != null) {
regionFactory.setPoolName(poolName);
} else {
@@ -330,14 +330,14 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
ClientServerTestCase.configureConnectionPool(regionFactory, serverHost,serverPorts, true, -1, -1, null);
}
}
- for (int i=0; i < regions.length; i++) {
+ for (int i=0; i < regions.length; i++) {
createRegion(regions[i], regionFactory.createRegionAttributes());
LogWriterUtils.getLogWriter().info("### Successfully Created Region on Client :" + regions[i]);
//region1.getAttributesMutator().setCacheListener(new CqListener());
}
}
};
-
+
client.invoke(createQService);
}
@@ -353,10 +353,10 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
} catch (Exception ex) {
LogWriterUtils.getLogWriter().info("### Failed to get CqService during ClientClose() ###");
}
-
+
}
};
-
+
client.invoke(closeCQService);
Wait.pause(1000);
}
@@ -417,13 +417,13 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
}
LogWriterUtils.getLogWriter().info("### Number of Entries In Region after Delete :" + region1.keys().size());
}
-
+
});
}
-
+
/**
* support for invalidating values.
- */
+ */
public void invalidateValues(VM vm, final String regionName, final int size) {
vm.invoke(new CacheSerializableRunnable("Create values") {
public void run2() throws CacheException {
@@ -433,41 +433,41 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
}
LogWriterUtils.getLogWriter().info("### Number of Entries In Region after Delete :" + region1.keys().size());
}
-
+
});
}
public void createPool(VM vm, String poolName, String server, int port) {
- createPool(vm, poolName, new String[]{server}, new int[]{port});
+ createPool(vm, poolName, new String[]{server}, new int[]{port});
}
public void createPool(VM vm, final String poolName, final String[] servers, final int[] ports) {
createPool(vm, poolName, servers, ports, null);
}
-
+
public void createPool(VM vm, final String poolName, final String[] servers, final int[] ports, final String redundancyLevel) {
vm.invoke(new CacheSerializableRunnable("createPool :" + poolName) {
public void run2() throws CacheException {
// Create Cache.
getCache();
IgnoredException.addIgnoredException("java.net.ConnectException");
-
+
PoolFactory cpf = PoolManager.createFactory();
cpf.setSubscriptionEnabled(true);
if (redundancyLevel != null){
int redundancy = Integer.parseInt(redundancyLevel);
cpf.setSubscriptionRedundancy(redundancy);
- }
-
+ }
+
for (int i=0; i < servers.length; i++){
LogWriterUtils.getLogWriter().info("### Adding to Pool. ### Server : " + servers[i] + " Port : " + ports[i]);
cpf.addServer(servers[i], ports[i]);
}
-
+
cpf.create(poolName);
}
- });
+ });
}
/* Register CQs */
@@ -477,7 +477,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
//pause(60 * 1000);
//getLogWriter().info("### DEBUG CREATE CQ START ####");
//pause(20 * 1000);
-
+
LogWriterUtils.getLogWriter().info("### Create CQ. ###" + cqName);
// Get CQ Service.
QueryService qService = null;
@@ -490,10 +490,10 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
CqAttributesFactory cqf = new CqAttributesFactory();
CqListener[] cqListeners = {new CqQueryTestListener(LogWriterUtils.getLogWriter())};
((CqQueryTestListener)cqListeners[0]).cqName = cqName;
-
+
cqf.initCqListeners(cqListeners);
CqAttributes cqa = cqf.create();
-
+
// Create CQ.
try {
CqQuery cq1 = qService.newCq(cqName, queryStr, cqa);
@@ -505,7 +505,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
throw err;
}
}
- });
+ });
}
// REMOVE..........
@@ -515,7 +515,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
//pause(60 * 1000);
//getLogWriter().info("### DEBUG CREATE CQ START ####");
//pause(20 * 1000);
-
+
LogWriterUtils.getLogWriter().info("### Create CQ. ###" + cqName);
// Get CQ Service.
QueryService qService = null;
@@ -528,10 +528,10 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
CqAttributesFactory cqf = new CqAttributesFactory();
CqListener[] cqListeners = {new CqQueryTestListener(LogWriterUtils.getLogWriter())};
((CqQueryTestListener)cqListeners[0]).cqName = cqName;
-
+
cqf.initCqListeners(cqListeners);
CqAttributes cqa = cqf.create();
-
+
// Create CQ.
try {
CqQuery cq1 = qService.newCq(cqName, queryStr, cqa);
@@ -543,7 +543,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
throw err;
}
}
- });
+ });
}
/* Register CQs with no name, execute, and close*/
@@ -553,7 +553,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
//pause(60 * 1000);
LogWriterUtils.getLogWriter().info("### DEBUG CREATE CQ START ####");
//pause(20 * 1000);
-
+
LogWriterUtils.getLogWriter().info("### Create CQ with no name. ###");
// Get CQ Service.
QueryService qService = null;
@@ -565,16 +565,16 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
} catch (Exception cqe) {
Assert.fail("Failed to getCQService.", cqe);
}
-
+
SelectResults cqResults = null;
for (int i = 0; i < 20; ++i) {
// Create CQ Attributes.
CqAttributesFactory cqf = new CqAttributesFactory();
CqListener[] cqListeners = {new CqQueryTestListener(LogWriterUtils.getLogWriter())};
-
+
cqf.initCqListeners(cqListeners);
CqAttributes cqa = cqf.create();
-
+
// Create CQ with no name and execute with initial results.
try {
cq1 = qService.newCq(queryStr, cqa);
@@ -583,7 +583,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
LogWriterUtils.getLogWriter().info("CQService is :" + qService);
Assert.fail("Failed to create CQ with no name" + " . ", ex);
}
-
+
if (cq1 == null) {
LogWriterUtils.getLogWriter().info("Failed to get CqQuery object for CQ with no name.");
}
@@ -597,7 +597,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
cqResults = cq1.executeWithInitialResults();
} catch (Exception ex){
LogWriterUtils.getLogWriter().info("CqService is :" + qService);
- Assert.fail("Failed to execute CQ with initial results, cq name: "
+ Assert.fail("Failed to execute CQ with initial results, cq name: "
+ cqName + " . ", ex);
}
LogWriterUtils.getLogWriter().info("initial result size = " + cqResults.size());
@@ -614,7 +614,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
LogWriterUtils.getLogWriter().info("CQ state after execute = " + cq1.getState());
assertTrue("execute() state mismatch", cq1.getState().isRunning());
}
-
+
//Close the CQ
try {
cq1.close();
@@ -625,20 +625,20 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
assertTrue("closeCq() state mismatch", cq1.getState().isClosed());
}
}
- });
+ });
}
-
+
public void executeCQ(VM vm, final String cqName, final boolean initialResults,
String expectedErr) {
executeCQ(vm, cqName, initialResults, noTest, null, expectedErr);
- }
-
+ }
+
/**
* Execute/register CQ as running.
* @param initialResults true if initialResults are requested
* @param expectedResultsSize if >= 0, validate results against this size
* @param expectedErr if not null, an error we expect
- */
+ */
public void executeCQ(VM vm, final String cqName,
final boolean initialResults,
final int expectedResultsSize,
@@ -650,7 +650,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
//pause(60 * 1000);
LogWriterUtils.getLogWriter().info("### DEBUG EXECUTE CQ START ####");
//pause(20 * 1000);
-
+
// Get CQ Service.
QueryService cqService = null;
CqQuery cq1 = null;
@@ -663,7 +663,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
// throw err;
// fail("Failed to getCQService.");
// }
-
+
// Get CqQuery object.
try {
cq1 = cqService.getCq(cqName);
@@ -682,10 +682,10 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
err.initCause(ex);
throw err;
}
-
+
if (initialResults) {
SelectResults cqResults = null;
-
+
try {
cqResults = cq1.executeWithInitialResults();
} catch (Exception ex){
@@ -698,11 +698,11 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
LogWriterUtils.getLogWriter().info("initial result size = " + cqResults.size());
assertTrue("executeWithInitialResults() state mismatch", cq1.getState().isRunning());
if (expectedResultsSize >= 0) {
- assertEquals("Unexpected results size for CQ: " + cqName +
- " CQ Query :" + cq1.getQueryString(),
+ assertEquals("Unexpected results size for CQ: " + cqName +
+ " CQ Query :" + cq1.getQueryString(),
expectedResultsSize, cqResults.size());
}
-
+
if (expectedKeys != null) {
HashSet resultKeys = new HashSet();
for (Object o : cqResults.asList()) {
@@ -710,14 +710,14 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
resultKeys.add(s.get("key"));
}
for (int i =0; i < expectedKeys.length; i++){
- assertTrue("Expected key :" + expectedKeys[i] +
- " Not found in CqResults for CQ: " + cqName +
- " CQ Query :" + cq1.getQueryString() +
+ assertTrue("Expected key :" + expectedKeys[i] +
+ " Not found in CqResults for CQ: " + cqName +
+ " CQ Query :" + cq1.getQueryString() +
" Keys in CqResults :" + resultKeys,
resultKeys.contains(expectedKeys[i]));
}
}
- }
+ }
else {
try {
cq1.execute();
@@ -732,7 +732,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
assertTrue("execute() state mismatch", cq1.getState().isRunning());
}
}
-
+
public void run2() throws CacheException {
if (expectedErr != null) {
getCache().getLogger().info("<ExpectedException action=add>"
@@ -740,7 +740,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
}
try {
work();
- }
+ }
finally {
if (expectedErr != null) {
getCache().getLogger().info("<ExpectedException action=remove>"
@@ -748,9 +748,9 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
}
}
}
- });
+ });
}
-
+
/* Stop/pause CQ */
public void stopCQ(VM vm, final String cqName) throws Exception {
vm.invoke(new CacheSerializableRunnable("Stop CQ :" + cqName) {
@@ -763,7 +763,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
} catch (Exception cqe) {
Assert.fail("Failed to getCQService.", cqe);
}
-
+
// Stop CQ.
CqQuery cq1 = null;
try {
@@ -776,7 +776,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
}
});
}
-
+
// Stop and execute CQ repeatedly
/* Stop/pause CQ */
private void stopExecCQ(VM vm, final String cqName, final int count) throws Exception {
@@ -791,14 +791,14 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
} catch (Exception cqe) {
Assert.fail("Failed to getCqService.", cqe);
}
-
+
// Get CQ.
try {
cq1 = cqService.getCq(cqName);
} catch (Exception ex){
Assert.fail("Failed to get CQ " + cqName + " . ", ex);
}
-
+
for (int i = 0; i < count; ++i) {
// Stop CQ.
try {
@@ -809,7 +809,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
assertTrue("Stop CQ state mismatch, count = " + i, cq1.getState().isStopped());
LogWriterUtils.getLogWriter().info("After stop in Stop and Execute loop, ran successfully, loop count: " + i);
LogWriterUtils.getLogWriter().info("CQ state: " + cq1.getState());
-
+
// Re-execute CQ
try {
cq1.execute();
@@ -823,8 +823,8 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
}
});
}
-
-
+
+
/* UnRegister CQs */
public void closeCQ(VM vm, final String cqName) throws Exception {
vm.invoke(new CacheSerializableRunnable("Close CQ :" + cqName) {
@@ -837,7 +837,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
} catch (Exception cqe) {
Assert.fail("Failed to getCqService.", cqe);
}
-
+
// Close CQ.
CqQuery cq1 = null;
try {
@@ -850,12 +850,12 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
}
});
}
-
+
/* Register CQs */
public void registerInterestListCQ(VM vm, final String regionName, final int keySize, final boolean all) {
vm.invoke(new CacheSerializableRunnable("Register InterestList and CQ") {
public void run2() throws CacheException {
-
+
// Get CQ Service.
Region region = null;
try {
@@ -867,7 +867,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
throw err;
}
-
+
try {
if (all) {
region.registerInterest("ALL_KEYS");
@@ -884,22 +884,22 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
throw err;
}
}
- });
+ });
}
-
+
/* Validate CQ Count */
public void validateCQCount(VM vm, final int cqCnt) throws Exception {
vm.invoke(new CacheSerializableRunnable("validate cq count") {
public void run2() throws CacheException {
// Get CQ Service.
-
+
QueryService cqService = null;
try {
cqService = getCache().getQueryService();
} catch (Exception cqe) {
Assert.fail("Failed to getCQService.", cqe);
}
-
+
int numCqs = 0;
try {
numCqs = cqService.getCqs().length;
@@ -910,9 +910,9 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
}
});
}
-
-
- /**
+
+
+ /**
* Throws AssertionError if the CQ can be found or if any other
* error occurs
*/
@@ -922,12 +922,12 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
LogWriterUtils.getLogWriter().info("### Fail if CQ Exists. ### " + cqName);
// Get CQ Service.
QueryService cqService = null;
- try {
+ try {
cqService = getCache().getQueryService();
} catch (Exception cqe) {
Assert.fail("Failed to getCQService.", cqe);
}
-
+
CqQuery cQuery = cqService.getCq(cqName);
if (cQuery != null) {
fail("Unexpectedly found CqQuery for CQ : " + cqName);
@@ -935,31 +935,31 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
}
});
}
-
+
private void validateCQError(VM vm, final String cqName,
final int numError) {
vm.invoke(new CacheSerializableRunnable("Validate CQs") {
public void run2() throws CacheException {
-
+
LogWriterUtils.getLogWriter().info("### Validating CQ. ### " + cqName);
// Get CQ Service.
QueryService cqService = null;
- try {
+ try {
cqService = getCache().getQueryService();
} catch (Exception cqe) {
Assert.fail("Failed to getCQService.", cqe);
}
-
+
CqQuery cQuery = cqService.getCq(cqName);
if (cQuery == null) {
fail("Failed to get CqQuery for CQ : " + cqName);
}
-
+
CqAttributes cqAttr = cQuery.getCqAttributes();
CqListener cqListener = cqAttr.getCqListener();
CqQueryTestListener listener = (CqQueryTestListener) cqListener;
listener.printInfo(false);
-
+
// Check for totalEvents count.
if (numError != noTest) {
// Result size validation.
@@ -969,7 +969,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
}
});
}
-
+
public void validateCQ(VM vm, final String cqName,
final int resultSize,
final int creates,
@@ -978,7 +978,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
validateCQ(vm, cqName, resultSize, creates, updates, deletes,
noTest, noTest, noTest, noTest);
}
-
+
public void validateCQ(VM vm, final String cqName,
final int resultSize,
final int creates,
@@ -993,29 +993,29 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
LogWriterUtils.getLogWriter().info("### Validating CQ. ### " + cqName);
// Get CQ Service.
QueryService cqService = null;
- try {
+ try {
cqService = getCache().getQueryService();
} catch (Exception cqe) {
Assert.fail("Failed to getCQService.", cqe);
}
-
+
CqQuery cQuery = cqService.getCq(cqName);
if (cQuery == null) {
fail("Failed to get CqQuery for CQ : " + cqName);
}
-
+
CqAttributes cqAttr = cQuery.getCqAttributes();
CqListener cqListeners[] = cqAttr.getCqListeners();
CqQueryTestListener listener = (CqQueryTestListener) cqListeners[0];
listener.printInfo(false);
-
+
// Check for totalEvents count.
if (totalEvents != noTest) {
// Result size validation.
listener.printInfo(true);
assertEquals("Total Event Count mismatch", totalEvents, listener.getTotalEventCount());
}
-
+
if (resultSize != noTest) {
//SelectResults results = cQuery.getCqResults();
//getLogWriter().info("### CQ Result Size is :" + results.size());
@@ -1025,72 +1025,72 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
fail("test for event counts instead of results size");
// assertEquals("Result Size mismatch", resultSize, listener.getTotalEventCount());
}
-
+
// Check for create count.
if (creates != noTest) {
// Result size validation.
listener.printInfo(true);
assertEquals("Create Event mismatch", creates, listener.getCreateEventCount());
}
-
+
// Check for update count.
if (updates != noTest) {
// Result size validation.
listener.printInfo(true);
assertEquals("Update Event mismatch", updates, listener.getUpdateEventCount());
}
-
+
// Check for delete count.
if (deletes != noTest) {
// Result size validation.
listener.printInfo(true);
assertEquals("Delete Event mismatch", deletes, listener.getDeleteEventCount());
}
-
+
// Check for queryInsert count.
if (queryInserts != noTest) {
// Result size validation.
listener.printInfo(true);
assertEquals("Query Insert Event mismatch", queryInserts, listener.getQueryInsertEventCount());
}
-
+
// Check for queryUpdate count.
if (queryUpdates != noTest) {
// Result size validation.
listener.printInfo(true);
assertEquals("Query Update Event mismatch", queryUpdates, listener.getQueryUpdateEventCount());
}
-
+
// Check for queryDelete count.
if (queryDeletes != noTest) {
// Result size validation.
listener.printInfo(true);
assertEquals("Query Delete Event mismatch", queryDeletes, listener.getQueryDeleteEventCount());
- }
+ }
}
});
}
-
+
public void waitForCreated(VM vm, final String cqName, final String key){
waitForEvent(vm, 0, cqName, key);
}
-
+
public void waitForUpdated(VM vm, final String cqName, final String key){
waitForEvent(vm, 1, cqName, key);
}
-
+
public void waitForDestroyed(VM vm, final String cqName, final String key){
waitForEvent(vm, 2, cqName, key);
}
-
+
public void waitForInvalidated(VM vm, final String cqName, final String key){
waitForEvent(vm, 3, cqName, key);
}
-
+
public void waitForClose(VM vm, final String cqName){
waitForEvent(vm, 4, cqName, null);
}
-
+
public void waitForRegionClear(VM vm, final String cqName){
waitForEvent(vm, 5, cqName, null);
}
@@ -1109,53 +1109,53 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
} catch (Exception cqe) {
Assert.fail("Failed to getCQService.", cqe);
}
-
+
CqQuery cQuery = cqService.getCq(cqName);
if (cQuery == null) {
fail("Failed to get CqQuery for CQ : " + cqName);
}
-
+
CqAttributes cqAttr = cQuery.getCqAttributes();
CqListener[] cqListener = cqAttr.getCqListeners();
CqQueryTestListener listener = (CqQueryTestListener) cqListener[0];
-
+
switch (event) {
case CREATE :
listener.waitForCreated(key);
break;
-
+
case UPDATE :
listener.waitForUpdated(key);
break;
-
+
case DESTROY :
listener.waitForDestroyed(key);
- break;
-
+ break;
+
case INVALIDATE :
listener.waitForInvalidated(key);
- break;
-
+ break;
+
case CLOSE :
listener.waitForClose();
- break;
+ break;
case REGION_CLEAR :
listener.waitForRegionClear();
- break;
+ break;
case REGION_INVALIDATE :
listener.waitForRegionInvalidate();
- break;
+ break;
}
}
});
}
-
+
/**
* Waits till the CQ state is same as the expected.
- * Waits for max time, if the CQ state is not same as expected
+ * Waits for max time, if the CQ state is not same as expected
* throws exception.
*/
public void waitForCqState(VM vm, final String cqName, final int state) {
@@ -1179,9 +1179,9 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
// Wait max time, till the CQ state is as expected.
final long start = System.currentTimeMillis();
while (cqState.getState() != state) {
- junit.framework.Assert.assertTrue("Waited over " + MAX_TIME
+ junit.framework.Assert.assertTrue("Waited over " + MAX_TIME
+ "ms for Cq State to be changed to " + state
- + "; consider raising " + WAIT_PROPERTY,
+ + "; consider raising " + WAIT_PROPERTY,
(System.currentTimeMillis() - start) < MAX_TIME);
Wait.pause(100);
}
@@ -1193,33 +1193,33 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
vm.invoke(new CacheSerializableRunnable("validate cq count") {
public void run2() throws CacheException {
// Get CQ Service.
-
+
QueryService cqService = null;
try {
cqService = getCache().getQueryService();
} catch (Exception cqe) {
Assert.fail("Failed to getCQService.", cqe);
}
-
+
CqQuery cQuery = cqService.getCq(cqName);
if (cQuery == null) {
fail("Failed to get CqQuery for CQ : " + cqName);
}
-
+
CqAttributes cqAttr = cQuery.getCqAttributes();
CqListener cqListener = cqAttr.getCqListener();
CqQueryTestListener listener = (CqQueryTestListener) cqListener;
- listener.getEventHistory();
+ listener.getEventHistory();
}
});
}
-
+
public void validateQuery(VM vm, final String query, final int resultSize) {
vm.invoke(new CacheSerializableRunnable("Validate Query") {
public void run2() throws CacheException {
LogWriterUtils.getLogWriter().info("### Validating Query. ###");
QueryService qs = getCache().getQueryService();
-
+
Query q = qs.newQuery(query);
try {
Object r = q.execute();
@@ -1235,9 +1235,9 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
}
});
}
-
+
private Properties getConnectionProps(String[] hosts, int[] ports, Properties newProps) {
-
+
Properties props = new Properties();
String endPoints = "";
String host = hosts[0];
@@ -1252,13 +1252,13 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
endPoints = endPoints + ",";
}
}
-
+
props.setProperty("endpoints", endPoints);
props.setProperty("retryAttempts", "1");
//props.setProperty("establishCallbackConnection", "true");
//props.setProperty("LBPolicy", "Sticky");
//props.setProperty("readTimeout", "120000");
-
+
// Add other property elements.
if (newProps != null) {
Enumeration e = newProps.keys();
@@ -1269,8 +1269,8 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
}
return props;
}
-
-
+
+
// Exercise CQ attributes mutator functions
private void mutateCQAttributes(VM vm, final String cqName, final int mutator_function) throws Exception {
vm.invoke(new CacheSerializableRunnable("Stop CQ :" + cqName) {
@@ -1284,7 +1284,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
} catch (Exception cqe) {
Assert.fail("Failed to getCQService.", cqe);
}
-
+
// Get CQ.
try {
cq1 = cqService.getCq(cqName);
@@ -1297,30 +1297,30 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
switch (mutator_function) {
case CREATE:
// Reinitialize with 2 CQ Listeners
- CqListener cqListenersArray[] = {new CqQueryTestListener(getCache().getLogger()),
+ CqListener cqListenersArray[] = {new CqQueryTestListener(getCache().getLogger()),
new CqQueryTestListener(getCache().getLogger())};
cqAttrMutator.initCqListeners(cqListenersArray);
cqListeners = cqAttr.getCqListeners();
assertEquals("CqListener count mismatch", cqListeners.length, 2);
break;
-
+
case UPDATE:
// Add 2 new CQ Listeners
CqListener newListener1 = new CqQueryTestListener(getCache().getLogger());
CqListener newListener2 = new CqQueryTestListener(getCache().getLogger());
cqAttrMutator.addCqListener(newListener1);
cqAttrMutator.addCqListener(newListener2);
-
+
cqListeners = cqAttr.getCqListeners();
assertEquals("CqListener count mismatch", cqListeners.length, 3);
break;
-
+
case DESTROY:
cqListeners = cqAttr.getCqListeners();
cqAttrMutator.removeCqListener(cqListeners[0]);
cqListeners = cqAttr.getCqListeners();
assertEquals("CqListener count mismatch", cqListeners.length, 2);
-
+
// Remove a listener and validate
cqAttrMutator.removeCqListener(cqListeners[0]);
cqListeners = cqAttr.getCqListeners();
@@ -1330,10 +1330,10 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
}
});
}
-
-
-
-
+
+
+
+
/**
* Test for InterestList and CQ registered from same clients.
* @throws Exception
@@ -1347,30 +1347,30 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
createServer(server);
final int thePort = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort");
final String host0 = NetworkUtils.getServerHostName(server.getHost());
-
+
String poolName = "testInterestListAndCQs";
createPool(client, poolName, host0, thePort);
-
+
createClient(client, thePort, host0);
/* Create CQs. */
- createCQ(client, poolName, "testInterestListAndCQs_0", cqs[0]);
+ createCQ(client, poolName, "testInterestListAndCQs_0", cqs[0]);
validateCQCount(client, 1);
/* Init values at server. */
final int size = 10;
-
- executeCQ(client, "testInterestListAndCQs_0", false, null);
+
+ executeCQ(client, "testInterestListAndCQs_0", false, null);
registerInterestListCQ(client, regions[0], size, false);
-
+
createValues(server, regions[0], size);
// Wait for client to Synch.
-
+
for (int i=1; i <=10; i++){
waitForCreated(client, "testInterestListAndCQs_0", KEY + i);
}
Wait.pause(5 * 1000);
-
+
// validate CQs.
validateCQ(client, "testInterestListAndCQs_0",
/* resultSize: */ noTest,
@@ -1381,17 +1381,17 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
/* queryUpdates: */ 0,
/* queryDeletes: */ 0,
/* totalEvents: */ size);
-
-
+
+
// Validate InterestList.
// CREATE
client.invoke(new CacheSerializableRunnable("validate updates") {
public void run2() throws CacheException {
final Region region = getRootRegion().getSubregion(regions[0]);
assertNotNull(region);
-
+
// Set keys = region.entrySet();
-// assertEquals("Mismatch, number of keys in local region is not equal to the interest list size",
+// assertEquals("Mismatch, number of keys in local region is not equal to the interest list size",
// size, keys.size());
// TODO does this WaitCriterion actually help?
WaitCriterion wc = new WaitCriterion() {
@@ -1401,7 +1401,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
if (sz == size) {
return true;
}
- excuse = "Mismatch, number of keys (" + sz +
+ excuse = "Mismatch, number of keys (" + sz +
") in local region is not equal to the interest list size (" +
size + ")";
return false;
@@ -1411,7 +1411,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
}
};
Wait.waitForCriterion(wc, 60 * 1000, 1000, true);
-
+
CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
for (int i = 1; i <= 10; i++) {
ctl.waitForCreated(KEY+i);
@@ -1419,24 +1419,24 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
}
}
});
-
+
// UPDATE
createValues(server, regions[0], size);
// Wait for client to Synch.
for (int i=1; i <=10; i++){
waitForUpdated(client, "testInterestListAndCQs_0", KEY + i);
}
-
-
+
+
client.invoke(new CacheSerializableRunnable("validate updates") {
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regions[0]);
assertNotNull(region);
-
+
Set keys = region.entrySet();
- assertEquals("Mismatch, number of keys in local region is not equal to the interest list size",
+ assertEquals("Mismatch, number of keys in local region is not equal to the interest list size",
size, keys.size());
-
+
CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
for (int i = 1; i <= 10; i++) {
ctl.waitForUpdated(KEY+i);
@@ -1444,7 +1444,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
}
}
});
-
+
// INVALIDATE
server.invoke(new CacheSerializableRunnable("Invalidate values") {
public void run2() throws CacheException {
@@ -1454,20 +1454,20 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
}
}
});
-
-
+
+
waitForInvalidated(client, "testInterestListAndCQs_0", KEY + 10);
-
-
+
+
client.invoke(new CacheSerializableRunnable("validate invalidates") {
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regions[0]);
assertNotNull(region);
-
+
Set keys = region.entrySet();
- assertEquals("Mismatch, number of keys in local region is not equal to the interest list size",
+ assertEquals("Mismatch, number of keys in local region is not equal to the interest list size",
size, keys.size());
-
+
CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
for (int i = 1; i <= 10; i++) {
ctl.waitForInvalidated(KEY+i);
@@ -1475,7 +1475,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
}
}
});
-
+
validateCQ(client, "testInterestListAndCQs_0",
/* resultSize: */ noTest,
/* creates: */ size,
@@ -1485,7 +1485,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
/* queryUpdates: */ size,
/* queryDeletes: */ size,
/* totalEvents: */ size * 3);
-
+
// DESTROY - this should not have any effect on CQ, as the events are
// already destroyed from invalidate events.
server.invoke(new CacheSerializableRunnable("Invalidate values") {
@@ -1502,7 +1502,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(regions[0]);
assertNotNull(region);
-
+
CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
for (int i = 1; i <= 10; i++) {
ctl.waitForDestroyed(KEY+i);
@@ -1523,14 +1523,14 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
closeClient(client);
closeServer(server);
}
-
-
+
+
/**
* Test for CQ register and UnRegister.
* @throws Exception
*/
public void testCQStopExecute() throws Exception {
-
+
final Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(1);
@@ -1539,29 +1539,29 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
createServer(server);
final int thePort = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort");
final String host0 = NetworkUtils.getServerHostName(server.getHost());
-
+
String poolName = "testCQStopExecute";
createPool(client, poolName, host0, thePort);
-
+
//createClient(client, thePort, host0);
/* Create CQs. */
- createCQ(client, poolName, "testCQStopExecute_0", cqs[0]);
+ createCQ(client, poolName, "testCQStopExecute_0", cqs[0]);
validateCQCount(client, 1);
-
+
executeCQ(client, "testCQStopExecute_0", false, null);
/* Init values at server. */
int size = 10;
createValues(server, regions[0], size);
// Wait for client to Synch.
-
+
waitForCreated(client, "testCQStopExecute_0", KEY+size);
-
-
+
+
// Check if Client and Server in sync.
//validateServerClientRegionEntries(server, client, regions[0]);
- validateQuery(server, cqs[0], 10);
+ validateQuery(server, cqs[0], 10);
// validate CQs.
//validateCQ(client, "testCQStopExecute_0", size, noTest, noTest, noTest);
validateCQ(client, "testCQStopExecute_0",
@@ -1573,10 +1573,10 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
/* queryUpdates: */ 0,
/* queryDeletes: */ 0,
/* totalEvents: */ size);
-
+
// Test CQ stop
stopCQ(client, "testCQStopExecute_0");
-
+
// Test CQ re-enable
executeCQ(client, "testCQStopExecute_0", false, null);
@@ -1585,10 +1585,10 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
// Wait for client to Synch.
waitForCreated(client, "testCQStopExecute_0", KEY+20);
size = 30;
-
+
// Check if Client and Server in sync.
//validateServerClientRegionEntries(server, client, regions[0]);
- validateQuery(server, cqs[0], 20);
+ validateQuery(server, cqs[0], 20);
// validate CQs.
//validateCQ(client, "testCQStopExecute_0", size, noTest, noTest, noTest);
validateCQ(client, "testCQStopExecute_0",
@@ -1600,19 +1600,19 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
/* queryUpdates: */ 10,
/* queryDeletes: */ 0,
/* totalEvents: */ size);
-
-
+
+
// Stop and execute CQ 20 times
stopExecCQ(client, "testCQStopExecute_0", 20);
-
+
// Test CQ Close
closeCQ(client, "testCQStopExecute_0");
-
+
// Close.
closeClient(client);
closeServer(server);
}
-
+
/**
* Test for CQ Attributes Mutator functions
* @throws Exception
@@ -1626,23 +1626,23 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
createServer(server);
final int thePort = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort");
final String host0 = NetworkUtils.getServerHostName(server.getHost());
-
+
String poolName = "testCQAttributesMutator";
createPool(client, poolName, host0, thePort);
//createClient(client, thePort, host0);
/* Create CQs. */
String cqName = new String("testCQAttributesMutator_0");
- createCQ(client, poolName, cqName, cqs[0]);
- validateCQCount(client, 1);
- executeCQ(client,cqName, false, null);
+ createCQ(client, poolName, cqName, cqs[0]);
+ validateCQCount(client, 1);
+ executeCQ(client,cqName, false, null);
/* Init values at server. */
int size = 10;
createValues(server, regions[0], size);
// Wait for client to Synch.
waitForCreated(client, cqName, KEY + size);
-
+
// validate CQs.
validateCQ(client, cqName,
/* resultSize: */ noTest,
@@ -1653,14 +1653,14 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
/* queryUpdates: */ 0,
/* queryDeletes: */ 0,
/* totalEvents: */ size);
-
+
// Add 2 new CQ Listeners
mutateCQAttributes(client, cqName, UPDATE);
/* Init values at server. */
createValues(server, regions[0], size * 2);
waitForCreated(client, cqName, KEY + (size * 2));
-
+
validateCQ(client, cqName,
/* resultSize: */ noTest,
/* creates: */ 20,
@@ -1670,10 +1670,10 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
/* queryUpdates: */ 10,
/* queryDeletes: */ 0,
/* totalEvents: */ 30);
-
+
// Remove 2 listeners and validate
mutateCQAttributes(client, cqName, DESTROY);
-
+
validateCQ(client, cqName,
/* resultSize: */ noTest,
/* creates: */ 10,
@@ -1683,7 +1683,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
/* queryUpdates: */ 10,
/* queryDeletes: */ 0,
/* totalEvents: */ 20);
-
+
// Reinitialize with 2 CQ Listeners
mutateCQAttributes(client, cqName, CREATE);
@@ -1691,7 +1691,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
deleteValues(server, regions[0], 20);
// Wait for client to Synch.
waitForDestroyed(client, cqName, KEY + (size * 2));
-
+
validateCQ(client, cqName,
/* resultSize: */ noTest,
/* creates: */ 0,
@@ -1701,21 +1701,21 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
/* queryUpdates: */ 0,
/* queryDeletes: */ 20,
/* totalEvents: */ 20);
-
+
// Close CQ
closeCQ(client, cqName);
-
+
// Close.
closeClient(client);
closeServer(server);
}
-
+
/**
* Test for CQ register and UnRegister.
* @throws Exception
*/
public void testCQCreateClose() throws Exception {
-
+
final Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(1);
@@ -1724,7 +1724,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
createServer(server);
final int thePort = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort");
final String host0 = NetworkUtils.getServerHostName(server.getHost());
-
+
String poolName = "testCQCreateClose";
System.out.println("##### Pool Name :" + poolName + " host :" + host0 + " port :" + thePort);
createPool(client, poolName, host0, thePort);
@@ -1737,9 +1737,9 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
//getLogWriter().info("### DEBUG START ####");
/* Create CQs. */
- createCQ(client, poolName, "testCQCreateClose_0", cqs[0]);
+ createCQ(client, poolName, "testCQCreateClose_0", cqs[0]);
validateCQCount(client, 1);
-
+
executeCQ(client, "testCQCreateClose_0", false, null);
/* Init values at server. */
@@ -1747,10 +1747,10 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
createValues(server, regions[0], size);
// Wait for client to Synch.
waitForCreated(client, "testCQCreateClose_0", KEY+size);
-
+
// Check if Client and Server in sync.
//validateServerClientRegionEntries(server, client, regions[0]);
- validateQuery(server, cqs[0], 10);
+ validateQuery(server, cqs[0], 10);
// validate CQs.
validateCQ(client, "testCQCreateClose_0",
/* resultSize: */ noTest,
@@ -1761,29 +1761,29 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
/* queryUpdates: */ 0,
/* queryDeletes: */ 0,
/* totalEvents: */ size);
-
+
// Test CQ stop
stopCQ(client, "testCQCreateClose_0");
-
+
// Test CQ re-enable
executeCQ(client, "testCQCreateClose_0", false, null);
-
+
// Test CQ Close
closeCQ(client, "testCQCreateClose_0");
-
+
//Create CQs with no name, execute, and close.
// UNCOMMENT....
- createAndExecCQNoName(client, poolName, cqs[0]);
-
+ createAndExecCQNoName(client, poolName, cqs[0]);
+
// Accessing the closed CQ.
failIfCQExists(client, "testCQCreateClose_0");
-
+
// re-Create the cq which is closed.
createCQ(client, poolName, "testCQCreateClose_0", cqs[0]);
/* Test CQ Count */
validateCQCount(client, 1);
-
+
// Registering CQ with same name from same client.
try {
createCQ(client, poolName, "testCQCreateClose_0", cqs[0]);
@@ -1795,43 +1795,43 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
assertTrue("Got wrong exception: " + causeCause.getClass().getName(),
causeCause instanceof CqExistsException);
}
-
+
// Getting values from non-existent CQ.
failIfCQExists(client, "testCQCreateClose_NO");
-
+
// Server Registering CQ.
try {
createCQ(server, "testCQCreateClose_1", cqs[0]);
fail("Trying to create CQ on Cache Server. Should have thrown Exception.");
} catch (com.gemstone.gemfire.test.dunit.RMIException rmiExc) {
Throwable cause = rmiExc.getCause();
- assertTrue("unexpected cause: " + cause.getClass().getName(),
+ assertTrue("unexpected cause: " + cause.getClass().getName(),
cause instanceof AssertionError);
Throwable causeCause = cause.getCause(); // should be a IllegalStateException
assertTrue("Got wrong exception: " + causeCause.getClass().getName(),
causeCause instanceof IllegalStateException);
}
-
+
validateCQCount(client, 1);
-
+
createCQ(client, poolName, "testCQCreateClose_3", cqs[2]);
-
+
validateCQCount(client, 2);
/* Test for closeAllCQs() */
-
+
client.invoke(new CacheSerializableRunnable("CloseAll CQ :") {
public void run2() throws CacheException {
LogWriterUtils.getLogWriter().info("### Close All CQ. ###");
// Get CQ Service.
QueryService cqService = null;
- try {
+ try {
cqService = getCache().getQueryService();
} catch (Exception cqe) {
LogWriterUtils.getLogWriter().info("Failed to getCQService.", cqe);
Assert.fail("Failed to getCQService.", cqe);
}
-
+
// Close CQ.
try {
cqService.closeCqs();
@@ -1841,30 +1841,30 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
}
}
});
-
+
validateCQCount(client, 0);
-
+
// Initialize.
createCQ(client, poolName, "testCQCreateClose_2", cqs[1]);
createCQ(client, poolName, "testCQCreateClose_4", cqs[1]);
createCQ(client, poolName, "testCQCreateClose_5", cqs[1]);
-
+
// Execute few of the initialized cqs
executeCQ(client, "testCQCreateClose_4", false, null);
executeCQ(client, "testCQCreateClose_5", false, null);
-
+
// Call close all CQ.
client.invoke(new CacheSerializableRunnable("CloseAll CQ 2 :") {
public void run2() throws CacheException {
LogWriterUtils.getLogWriter().info("### Close All CQ 2. ###");
// Get CQ Service.
QueryService cqService = null;
- try {
+ try {
cqService = getCache().getQueryService();
} catch (Exception cqe) {
Assert.fail("Failed to getCQService.", cqe);
}
-
+
// Close CQ.
try {
cqService.closeCqs();
@@ -1873,12 +1873,12 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
}
}
});
-
+
// Close.
closeClient(client);
closeServer(server);
}
-
+
/**
* This will test the events after region destory.
* The CQs on the destroy region needs to be closed.
@@ -1893,31 +1893,31 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
createServer(server);
final int thePort = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort");
final String host0 = NetworkUtils.getServerHostName(server.getHost());
-
+
String poolName = "testRegionDestroy";
createPool(client, poolName, host0, thePort);
-
+
createClient(client, thePort, host0);
/* Create CQs. */
- createCQ(client, poolName, "testRegionDestroy_0", cqs[0]);
- createCQ(client, poolName, "testRegionDestroy_1", cqs[0]);
- createCQ(client, poolName, "testRegionDestroy_2", cqs[0]);
-
+ createCQ(client, poolName, "testRegionDestroy_0", cqs[0]);
+ createCQ(client, poolName, "testRegionDestroy_1", cqs[0]);
+ createCQ(client, poolName, "testRegionDestroy_2", cqs[0]);
+
executeCQ(client, "testRegionDestroy_0", false, null);
executeCQ(client, "testRegionDestroy_1", false, null);
executeCQ(client, "testRegionDestroy_2", false, null);
/* Init values at server. */
final int size = 10;
- registerInterestListCQ(client, regions[0], size, false);
+ registerInterestListCQ(client, regions[0], size, false);
createValues(server, regions[0], size);
-
+
// Wait for client to Synch.
-
+
waitForCreated(client, "testRegionDestroy_0", KEY + 10);
-
-
+
+
// validate CQs.
validateCQ(client, "testRegionDestroy_0",
/* resultSize: */ noTest,
@@ -1928,7 +1928,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
/* queryUpdates: */ 0,
/* queryDeletes: */ 0,
/* totalEvents: */ size);
-
+
// Validate InterestList.
// CREATE
client.invoke(new CacheSerializableRunnable("validate updates") {
@@ -1954,11 +1954,11 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
}
};
Wait.waitForCriterion(wc, 30 * 1000, 250, true);
-
+
Region region = getRootRegion().getSubregion(regions[0]);
assertNotNull(region);
-
- CertifiableTestCacheListener ctl = (CertifiableTestCacheListener)
+
+ CertifiableTestCacheListener ctl = (CertifiableTestCacheListener)
region.getAttributes().getCacheListener();
for (int i = 1; i <= 10; i++) {
ctl.waitForCreated(KEY+i);
@@ -1966,7 +1966,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
}
}
});
-
+
// Destroy Region.
server.invoke(new CacheSerializableRunnable("Destroy Region") {
public void run2() throws CacheException {
@@ -1974,20 +1974,20 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
region1.destroyRegion();
}
});
-
+
Wait.pause(2 * 1000);
validateCQCount(client, 0);
-
+
closeClient(client);
closeServer(server);
-
+
}
-
+
/**
* Test for CQ with multiple clients.
*/
public void testCQWithMultipleClients() throws Exception {
-
+
final Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client1 = host.getVM(1);
@@ -1998,11 +1998,11 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
createServer(server);
final int thePort = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort");
final String host0 = NetworkUtils.getServerHostName(server.getHost());
-
+
String poolName1 = "testCQWithMultipleClients1";
String poolName2 = "testCQWithMultipleClients2";
String poolName3 = "testCQWithMultipleClients3";
-
+
createPool(client1, poolName1, host0, thePort);
createPool(client2, poolName2, host0, thePort);
@@ -2012,12 +2012,12 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
createCQ(client2, poolName2, "testCQWithMultipleClients_0", cqs[0]);
executeCQ(client2, "testCQWithMultipleClients_0", false, null);
-
+
int size = 10;
-
+
// Create Values on Server.
createValues(server, regions[0], size);
-
+
waitForCreated(client1, "testCQWithMultipleClients_0", KEY + 10);
/* Validate the CQs */
@@ -2030,9 +2030,9 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
/* queryUpdates: */ 0,
/* queryDeletes: */ 0,
/* totalEvents: */ size);
-
+
waitForCreated(client2, "testCQWithMultipleClients_0", KEY + 10);
-
+
validateCQ(client2, "testCQWithMultipleClients_0",
/* resultSize: */ noTest,
/* creates: */ size,
@@ -2046,7 +2046,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
/* Close test */
closeCQ(client1, "testCQWithMultipleClients_0");
-
+
validateCQ(client2, "testCQWithMultipleClients_0",
/* resultSize: */ noTest,
/* creates: */ size,
@@ -2059,19 +2059,19 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
/* Init new client and create cq */
createPool(client3, poolName3, host0, thePort);
-
+
createCQ(client3, poolName3, "testCQWithMultipleClients_0", cqs[0]);
createCQ(client3, poolName3, "testCQWithMultipleClients_1", cqs[1]);
executeCQ(client3, "testCQWithMultipleClients_0", false, null);
executeCQ(client3, "testCQWithMultipleClients_1", false, null);
-
+
// Update values on Server. This will be updated on new Client CQs.
createValues(server, regions[0], size);
-
-
+
+
waitForUpdated(client3, "testCQWithMultipleClients_0", KEY + 10);
-
-
+
+
validateCQ(client3, "testCQWithMultipleClients_0",
/* resultSize: */ noTest,
/* creates: */ 0,
@@ -2081,7 +2081,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
/* queryUpdates: */ size,
/* queryDeletes: */ 0,
/* totalEvents: */ size);
-
+
validateCQ(client3, "testCQWithMultipleClients_1",
/* resultSize: */ noTest,
/* creates: */ 0,
@@ -2099,17 +2099,17 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
/* Close Client Test */
closeClient(client1);
-
+
clearCQListenerEvents(client2, "testCQWithMultipleClients_0");
clearCQListenerEvents(client3, "testCQWithMultipleClients_1");
-
+
// Update values on server, update again.
createValues(server, regions[0], size);
-
-
+
+
waitForUpdated(client2, "testCQWithMultipleClients_0", KEY + 10);
-
-
+
+
validateCQ(client2, "testCQWithMultipleClients_0",
/* resultSize: */ noTest,
/* creates: */ size,
@@ -2119,9 +2119,9 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
/* queryUpdates: */ size * 2,
/* queryDeletes: */ 0,
/* totalEvents: */ size * 3);
-
+
waitForUpdated(client3, "testCQWithMultipleClients_1", KEY + 2);
-
+
validateCQ(client3, "testCQWithMultipleClients_1",
/* resultSize: */ noTest,
/* creates: */ 0,
@@ -2137,24 +2137,24 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
closeClient(client3);
closeServer(server);
}
-
+
/**
* Test for CQ ResultSet.
*/
public void testCQResultSet() throws Exception {
-
+
final Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(1);
-
+
createServer(server);
-
+
final int thePort = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort");
final String host0 = NetworkUtils.getServerHostName(server.getHost());
-
+
String poolName = "testCQResultSet";
createPool(client, poolName, host0, thePort);
-
+
// Create client.
// createClient(client, thePort, host0);
@@ -2162,10 +2162,10 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
int size = 10;
createValues(server, regions[0], size);
Wait.pause(1*500);
-
+
// Create CQs.
- createCQ(client, poolName, "testCQResultSet_0", cqs[0]);
-
+ createCQ(client, poolName, "testCQResultSet_0", cqs[0]);
+
// Check resultSet Size.
executeCQ(client, "testCQResultSet_0", true, 10, null, null);
@@ -2174,11 +2174,11 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
// Check resultSet Size.
executeCQ(client, "testCQResultSet_1", true, 0, null, null);
stopCQ(client, "testCQResultSet_1");
-
+
// Init values.
- createValues(server, regions[1], 5);
+ createValues(server, regions[1], 5);
validateQuery(server, cqs[2], 2);
-
+
executeCQ(client, "testCQResultSet_1", true, 2, null, null);
/* compare values...
@@ -2205,24 +2205,24 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
}
*/
-
+
// Close.
- closeClient(client);
+ closeClient(client);
closeServer(server);
}
-
+
/**
* Test for CQ Listener events.
*
*/
public void testCQEvents() throws Exception {
-
+
final Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(1);
-
+
createServer(server);
-
+
final int thePort = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort");
final String host0 = NetworkUtils.getServerHostName(server.getHost());
@@ -2231,18 +2231,18 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
// Create client.
//createClient(client, thePort, host0);
-
+
// Create CQs.
createCQ(client, poolName, "testCQEvents_0", cqs[0]);
-
- executeCQ(client, "testCQEvents_0", false, null);
-
+
+ executeCQ(client, "testCQEvents_0", false, null);
+
// Init values at server.
int size = 10;
createValues(server, regions[0], size);
-
+
waitForCreated(client, "testCQEvents_0", KEY+size);
-
+
// validate Create events.
validateCQ(client, "testCQEvents_0",
/* resultSize: */ noTest,
@@ -2253,14 +2253,14 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
/* queryUpdates: */ 0,
/* queryDeletes: */ 0,
/* totalEvents: */ size);
-
+
// Update values.
createValues(server, regions[0], 5);
createValues(server, regions[0], 10);
-
+
waitForUpdated(client, "testCQEvents_0", KEY+size);
-
-
+
+
// validate Update events.
validateCQ(client, "testCQEvents_0",
/* resultSize: */ noTest,
@@ -2271,11 +2271,11 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
/* queryUpdates: */ 15,
/* queryDeletes: */ 0,
/* totalEvents: */ size + 15);
-
+
// Validate delete events.
deleteValues(server, regions[0], 5);
waitForDestroyed(client, "testCQEvents_0", KEY+5);
-
+
validateCQ(client, "testCQEvents_0",
/* resultSize: */ noTest,
/* creates: */ size,
@@ -2285,7 +2285,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
/* queryUpdates: */ 15,
/* queryDeletes: */ 5,
/* totalEvents: */ size + 15 + 5);
-
+
// Insert invalid Events.
server.invoke(new CacheSerializableRunnable("Create values") {
public void run2() throws CacheException {
@@ -2297,7 +2297,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
}
}
});
-
+
Wait.pause(1 * 1000);
// cqs should not get any creates, deletes or updates. rdubey.
validateCQ(client, "testCQEvents_0",
@@ -2309,25 +2309,25 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
/* queryUpdates: */ 15,
/* queryDeletes: */ 5,
/* totalEvents: */ size + 15 + 5);
-
+
// Close.
closeClient(client);
closeServer(server);
}
-
+
/**
* Test query execution multiple times on server without ALIAS.
* @throws Exception
*/
public void testCqEventsWithoutAlias() throws Exception {
-
+
final Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(1);
-
+
createServer(server);
-
+
final int thePort = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort");
final String host0 = NetworkUtils.getServerHostName(server.getHost());
@@ -2336,18 +2336,18 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
// Create client.
//createClient(client, thePort, host0);
-
+
// Create CQs.
createCQ(client, poolName, "testCQEvents_0", cqs[11]);
-
- executeCQ(client, "testCQEvents_0", false, null);
-
+
+ executeCQ(client, "testCQEvents_0", false, null);
+
// Init values at server.
int size = 10;
createValues(server, regions[0], size);
-
+
waitForCreated(client, "testCQEvents_0", KEY+size);
-
+
// validate Create events.
validateCQ(client, "testCQEvents_0",
/* resultSize: */ noTest,
@@ -2358,14 +2358,14 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
/* queryUpdates: */ 0,
/* queryDeletes: */ 0,
/* totalEvents: */ size);
-
+
// Update values.
createValues(server, regions[0], 5);
createValues(server, regions[0], 10);
-
+
waitForUpdated(client, "testCQEvents_0", KEY+size);
-
-
+
+
// validate Update events.
validateCQ(client, "testCQEvents_0",
/* resultSize: */ noTest,
@@ -2376,11 +2376,11 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
/* queryUpdates: */ 15,
/* queryDeletes: */ 0,
/* totalEvents: */ size + 15);
-
+
// Validate delete events.
deleteValues(server, regions[0], 5);
waitForDestroyed(client, "testCQEvents_0", KEY+5);
-
+
validateCQ(client, "testCQEvents_0",
/* resultSize: */ noTest,
/* creates: */ size,
@@ -2390,7 +2390,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
/* queryUpdates: */ 15,
/* queryDeletes: */ 5,
/* totalEvents: */ size + 15 + 5);
-
+
// Insert invalid Events.
server.invoke(new CacheSerializableRunnable("Create values") {
public void run2() throws CacheException {
@@ -2402,7 +2402,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
}
}
});
-
+
Wait.pause(1 * 1000);
// cqs should not get any creates, deletes or updates. rdubey.
validateCQ(client, "testCQEvents_0",
@@ -2414,7 +2414,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
/* queryUpdates: */ 15,
/* queryDeletes: */ 5,
/* totalEvents: */ size + 15 + 5);
-
+
// Close.
closeClient(client);
closeServer(server);
@@ -2427,9 +2427,9 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
final Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(1);
-
+
createServer(server);
-
+
final int thePort = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort");
final String host0 = NetworkUtils.getServerHostName(server.getHost());
@@ -2438,7 +2438,7 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
// Create client.
//createClient(client, thePort, host0);
-
+
// Create CQs.
createCQ(client, poolName, "testEnableDisable_0", cqs[0]);
executeCQ(client, "testEnableDisable_0", false, null);
@@ -2453,10 +2453,10 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
cqService.stopCqs();
} catch (Exception cqe) {
Assert.fail("Failed to getCQService.", cqe);
- }
- }
+ }
+ }
});
-
+
Wait.pause(1 * 1000);
// Init values at server.
int size = 10;
@@ -2483,11 +2483,11 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
cqService.executeCqs();
} catch (Exception cqe) {
Assert.fail("Failed to getCQService.", cqe);
- }
+ }
}
});
Wait.pause(1 * 1000);
- createValues(server, regions[0], size);
+ createValues(server, regions[0], size);
waitForUpdated(client, "testEnableDisable_0", KEY+size);
// It gets created on the CQs
validateCQ(client, "testEnableDisable_0",
@@ -2510,13 +2510,13 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
cqService.stopCqs("/root/" + regions[0]);
} catch (Exception cqe) {
Assert.fail("Failed to getCQService.", cqe);
- }
+ }
}
});
-
+
Wait.pause(2 * 1000);
deleteValues(server, regions[0], size / 2);
- Wait.pause(1 * 500);
+ Wait.pause(1 * 500);
// There should not be any deletes.
validateCQ(client, "testEnableDisable_0",
/* resultSize: */ noTest,
@@ -2538,11 +2538,11 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
cqService.executeCqs("/root/" + regions[0]);
} catch (Exception cqe) {
Assert.fail("Failed to getCQService.", cqe);
- }
+ }
}
});
Wait.pause(1 * 1000);
- createValues(server, regions[0], size / 2);
+ createValues(server, regions[0], size / 2);
waitForCreated(client, "testEnableDisable_0", KEY+(size / 2));
// Gets updated on the CQ.
validateCQ(client, "testEnableDisable_0",
@@ -2554,12 +2554,12 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
/* queryUpdates: */ size,
/* queryDeletes: */ 0,
/* totalEvents: */ size * 3 / 2);
-
+
// Close.
closeClient(client);
- closeServer(server);
+ closeServer(server);
}
-
+
/**
* Test for Complex queries.
* @throws Exception
@@ -2568,42 +2568,42 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
final Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(1);
-
+
createServer(server);
-
+
final int thePort = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort");
final String host0 = NetworkUtils.getServerHostName(server.getHost());
-
+
String poolName = "testQuery";
createPool(client, poolName, host0, thePort);
-
+
// Create client.
createClient(client, thePort, host0);
-
+
// Create CQs.
createCQ(client, poolName, "testQuery_3", cqs[3]);
executeCQ(client, "testQuery_3", true, null);
-
+
createCQ(client, poolName, "testQuery_4", cqs[4]);
executeCQ(client, "testQuery_4", true, null);
-
+
createCQ(client, poolName, "testQuery_5", cqs[5]);
executeCQ(client, "testQuery_5", true, null);
-
+
createCQ(client, poolName, "testQuery_6", cqs[6]);
executeCQ(client, "testQuery_6", true, null);
-
+
createCQ(client, poolName, "testQuery_7", cqs[7]);
executeCQ(client, "testQuery_7", true, null);
-
+
createCQ(client, poolName, "testQuery_8", cqs[8]);
executeCQ(client, "testQuery_8", true, null);
-
+
// Close.
closeClient(client);
closeServer(server);
}
-
+
/**
* Test for CQ Fail over.
* @throws Exception
@@ -2613,17 +2613,17 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM client = host.getVM(2);
-
+
createServer(server1);
-
+
final int port1 = server1.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort");
final String host0 = NetworkUtils.getServerHostName(server1.getHost());
// Create client.
// Properties props = new Properties();
// Create client with redundancyLevel -1
-
+
final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1);
-
+
//createClient(client, new int[] {port1, ports[0]}, host0, "-1");
String poolName = "testCQFailOver";
@@ -2636,63 +2636,63 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
executeCQ(client, "testCQFailOver_" + i, false, null);
}
Wait.pause(1 * 1000);
-
+
// CREATE.
createValues(server1, regions[0], 10);
createValues(server1, regions[1], 10);
waitForCreated(client, "testCQFailOver_0", KEY+10);
Wait.pause(1 * 1000);
-
+
createServer(server2, ports[0]);
final int thePort2 = server2.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort");
- System.out.println("### Port on which server1 running : " + port1 +
+ System.out.println("### Port on which server1 running : " + port1 +
" Server2 running : " + thePort2);
Wait.pause(3 * 1000);
// Extra pause - added after downmerging trunk r17050
Wait.pause(5 * 1000);
-
+
// UPDATE - 1.
- createValues(server1, regions[0], 10);
+ createValues(server1, regions[0], 10);
createValues(server1, regions[1], 10);
-
+
waitForUpdated(client, "testCQFailOver_0", KEY+10);
-
+
int[] resultsCnt = new int[] {10, 1, 2};
-
+
for (int i=0; i < numCQs; i++) {
validateCQ(client, "testCQFailOver_" + i, noTest, resultsCnt[i], resultsCnt[i], noTest);
- }
-
+ }
+
// Close server1.
closeServer(server1);
-
+
// Fail over should happen.
Wait.pause(3 * 1000);
-
+
for (int i=0; i < numCQs; i++) {
validateCQ(client, "testCQFailOver_" + i, noTest, resultsCnt[i], resultsCnt[i], noTest);
- }
-
+ }
+
// UPDATE - 2
this.clearCQListenerEvents(client, "testCQFailOver_0");
createValues(server2, regions[0], 10);
createValues(server2, regions[1], 10);
-
+
for (int i=1; i <= 10; i++) {
waitForUpdated(client, "testCQFailOver_0", KEY+i);
}
-
+
for (int i=0; i < numCQs; i++) {
validateCQ(client, "testCQFailOver_" + i, noTest, resultsCnt[i], resultsCnt[i] * 2, noTest);
- }
-
+ }
+
// Close.
closeClient(client);
closeServer(server2);
}
-
+
/**
* Test for CQ Fail over/HA with redundancy level set.
* @throws Exception
@@ -2702,37 +2702,37 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM server3 = host.getVM(2);
-
+
VM client = host.getVM(3);
-
+
//Killing servers can cause this message on the client side.
IgnoredException.addIgnoredException("Could not find any server");
-
+
createServer(server1);
-
+
final int port1 = server1.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort");
final String host0 = NetworkUtils.getServerHostName(server1.getHost());
-
+
final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(2);
-
+
createServer(server2, ports[0]);
final int thePort2 = server2.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort");
-
+
createServer(server3, ports[1]);
final int port3 = server3.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort");
- System.out.println("### Port on which server1 running : " + port1 +
- " server2 running : " + thePort2 +
+ System.out.println("### Port on which server1 running : " + port1 +
+ " server2 running : " + thePort2 +
" Server3 running : " + port3);
-
-
+
+
// Create client - With 3 server endpoints and redundancy level set to 2.
-
+
String poolName = "testCQStopExecute";
createPool(client, poolName, new String[] {host0, host0, host0}, new int[] {port1, thePort2, port3});
-
+
// Create client with redundancyLevel 1
//createClient(client, new int[] {port1, thePort2, port3}, host0, "1");
-
+
// Create CQs.
int numCQs = 1;
for (int i=0; i < numCQs; i++) {
@@ -2740,64 +2740,64 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
createCQ(client, poolName, "testCQHA_" + i, cqs[i]);
executeCQ(client, "testCQHA_" + i, false, null);
}
-
+
Wait.pause(1 * 1000);
-
+
// CREATE.
createValues(server1, regions[0], 10);
createValues(server1, regions[1], 10);
-
-
+
+
waitForCreated(client, "testCQHA_0", KEY + 10);
-
-
+
+
// Clients expected initial result.
int[] resultsCnt = new int[] {10, 1, 2};
-
+
// Close server1.
// To maintain the redundancy; it will make connection to endpoint-3.
closeServer(server1);
Wait.pause(3 * 1000);
-
+
// UPDATE-1.
createValues(server2, regions[0], 10);
createValues(server2, regions[1], 10);
-
-
+
+
waitForUpdated(client, "testCQHA_0", KEY + 10);
-
-
+
+
// Validate CQ.
for (int i=0; i < numCQs; i++) {
validateCQ(client, "testCQHA_" + i, noTest, resultsCnt[i], resultsCnt[i], noTest);
- }
-
+ }
+
// Close server-2
closeServer(server2);
Wait.pause(2 * 1000);
-
+
// UPDATE - 2.
clearCQListenerEvents(client, "testCQHA_0");
-
- createValues(server3, regions[0], 10);
+
+ createValues(server3, regions[0], 10);
createValues(server3, regions[1], 10);
-
+
// Wait for events at client.
-
+
waitForUpdated(client, "testCQHA_0", KEY + 10);
-
-
+
+
for (int i=0; i < numCQs; i++) {
validateCQ(client, "testCQHA_" + i, noTest, resultsCnt[i], resultsCnt[i] * 2, noTest);
- }
-
+ }
+
// Close.
closeClient(client);
closeServer(server3);
}
/**
- * Test Filter registration during GII.
+ * Test Filter registration during GII.
* Bug fix 39014
* @throws Exception
*/
@@ -2808,22 +2808,20 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
VM client1 = host.getVM(2);
VM client2 = host.getVM(3);
- Wait.pause(3 * 1000);
-
createServer(server1);
-
+
final int port1 = server1.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort");
final String host0 = NetworkUtils.getServerHostName(server1.getHost());
-
+
final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1);
-
+
String poolName = "testFilterRegistrationDuringGII";
createPool(client1, poolName, new String[] {host0, host0}, new int[] {port1, ports[0]}, "-1");
createPool(client2, poolName, new String[] {host0, host0}, new int[] {port1, ports[0]}, "-1");
-
+
createClient(client1, new int[] {port1, ports[0]}, host0, "-1", poolName);
createClient(client2, new int[] {port1, ports[0]}, host0, "-1", poolName);
-
+
// Create CQs.
final int numCQs = 2;
for (int i=0; i < numCQs; i++) {
@@ -2833,22 +2831,23 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
createCQ(client2, poolName, "client2_" + i, cqs[i]);
executeCQ(client2, "client2_" + i, false, null);
}
-
+
final int interestSize = 20;
registerInterestListCQ(client1, regions[0], interestSize, false);
registerInterestListCQ(client2, regions[0], 0, true);
-
+
Wait.pause(1 * 1000);
-
+
// CREATE.
createValues(server1, regions[0], 100);
createValues(server1, regions[1], 10);
-
+
waitForCreated(client1, "client1_0", KEY + 10);
// Create server2.
- server2.invoke(new CacheSerializableRunnable("Create Cache Server") {
- public void run2() throws CacheException
+ server2.invoke(new CacheSerializableRunnable("Create Cache Server",new Object[]{100}) {
+ public void run2() throws CacheException{}
+ public void run3() throws CacheException
{
LogWriterUtils.getLogWriter().info("### Create Cache Server. ###");
AttributesFactory factory = new AttributesFactory();
@@ -2868,7 +2867,8 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
}
while(true) {
- if (InitialImageOperation.slowImageSleeps > 0) {
+// if (InitialImageOperation.slowImageSleeps > 0) {
+ if ((int) args[0] > 0) {
// Create events while GII for HARegion is in progress.
LocalRegion region1 = (LocalRegion)getRootRegion().getSubregion(regions[0]);
for (int i = 90; i <= 120; i++) {
@@ -2886,8 +2886,8 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
);
Wait.pause(3 * 1000);
-
-
+
+
// Check if CQs are registered as part of GII.
server2.invoke(new CacheSerializableRunnable("Create values") {
public void run2() throws CacheException {
@@ -2896,11 +2896,11 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
Iterator iter = proxies.iterator();
try {
for (CacheClientProxy p : proxies){
- ClientProxyMembershipID clientId = p.getProxyID();
+ ClientProxyMembershipID clientId = p.getProxyID();
List cqs = qs.getCqService().getAllClientCqs(clientId);
getCache().getLogger().fine("Number of CQs found for client :" + clientId + " are :" + cqs.size());
if (cqs.size() != numCQs) {
- fail("Number of CQs registerted by the client is :" + cqs.size() +
+ fail("Number of CQs registerted by the client is :" + cqs.size() +
" less than expected : " + numCQs);
}
CqQuery cq = (CqQuery)cqs.get(0);
@@ -2940,15 +2940,15 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
VM server1 = host.getVM(0);
VM server2 = host.getVM(1);
VM client = host.getVM(2);
-
+
createServer(server1);
createServer(server2);
-
+
final int port1 = server1.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort");
final String host0 = NetworkUtils.getServerHostName(server1.getHost());
-
+
final int thePort2 = server2.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort");
-
+
SerializableRunnable createConnectionPool =
new CacheSerializableRunnable("Create region") {
public void run2() throws CacheException {
@@ -2956,17 +2956,17 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
IgnoredException.addIgnoredException("java.net.ConnectException||java.net.SocketException");
AttributesFactory regionFactory = new AttributesFactory();
regionFactory.setScope(Scope.LOCAL);
-
+
ClientServerTestCase.configureConnectionPool(regionFactory, host0, port1, thePort2, true, -1, -1, null);
-
+
createRegion(regions[0], regionFactory.createRegionAttributes());
}
};
-
-
+
+
// Create client.
client.invoke(createConnectionPool);
-
+
server1.invoke(new CacheSerializableRunnable("Create values") {
public void run2() throws CacheException {
Region region1 = getRootRegion().getSubregion(regions[0]);
@@ -2975,25 +2975,25 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
}
}
});
-
+
// Put some values on the client.
client.invoke(new CacheSerializableRunnable("Put values client") {
public void run2() throws CacheException {
Region region1 = getRootRegion().getSubregion(regions[0]);
-
+
for (int i = 0; i < 10; i++) {
region1.put("key-string-"+i, "client-value-"+i);
}
}
});
-
-
+
+
Wait.pause(2 * 1000);
closeServer(server1);
closeServer(server2);
}
-
-
+
+
/**
* Test getCQs for a regions
*/
@@ -3001,37 +3001,37 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
final Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(1);
-
+
createServer(server);
-
+
final int thePort = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort");
final String host0 = NetworkUtils.getServerHostName(server.getHost());
-
+
// Create client.
// createClient(client, thePort, host0);
-
+
String poolName = "testGetCQsForARegionName";
createPool(client, poolName, host0, thePort);
-
+
// Create CQs.
createCQ(client, poolName, "testQuery_3", cqs[3]);
executeCQ(client, "testQuery_3", true, null);
-
+
createCQ(client, poolName, "testQuery_4", cqs[4]);
executeCQ(client, "testQuery_4", true, null);
-
+
createCQ(client, poolName, "testQuery_5", cqs[5]);
executeCQ(client, "testQuery_5", true, null);
-
+
createCQ(client, poolName, "testQuery_6", cqs[6]);
executeCQ(client, "testQuery_6", true, null);
//with regions[1]
createCQ(client, poolName, "testQuery_7", cqs[7]);
executeCQ(client, "testQuery_7", true, null);
-
+
createCQ(client, poolName, "testQuery_8", cqs[8]);
executeCQ(client, "testQuery_8", true, null);
-
+
client.invoke(new CacheSerializableRunnable("Client disableCQs()") {
public void run2() throws CacheException {
// Get CQ Service.
@@ -3042,31 +3042,31 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
assertNotNull("CQservice should not return null for cqs on this region : /root/"+regions[0], cq);
getCache().getLogger().info("cqs for region: /root/"+regions[0]+" : "+cq.length);
// closing on of the cqs.
-
+
cq[0].close();
cq = cqService.getCqs("/root/"+regions[0]);
assertNotNull("CQservice should not return null for cqs on this region : /root/"+regions[0], cq);
getCache().getLogger().info("cqs for region: /root/"+regions[0]+" after closeing one of the cqs : "+cq.length);
-
+
cq = cqService.getCqs("/root/"+regions[1]);
getCache().getLogger().info("cqs for region: /root/"+regions[1]+" : "+cq.length);
assertNotNull("CQservice should not return null for cqs on this region : /root/"+regions[1], cq);
} catch (Exception cqe) {
Assert.fail("Failed to getCQService", cqe);
- }
+ }
}
});
-
+
// Close.
closeClient(client);
closeServer(server);
-
+
}
-
+
/**
* Tests execution of queries with NULL in where clause like where ID = NULL
* etc.
- *
+ *
* @throws Exception
*/
public void testQueryWithNULLInWhereClause() throws Exception
@@ -3084,10 +3084,10 @@ public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
String poolName = "testQueryWithNULLInWhereClause"
<TRUNCATED>