You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/05/31 23:15:19 UTC
[20/35] geode git commit: GEODE-2632: refactoring preparations for
SecurityService and BaseCommand changes
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java
index 4028ab3..929093d 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java
@@ -15,6 +15,8 @@
package org.apache.geode.internal.cache.ha;
import static org.apache.geode.distributed.ConfigurationProperties.*;
+import static org.hamcrest.CoreMatchers.*;
+import static org.hamcrest.number.OrderingComparison.*;
import static org.junit.Assert.*;
import java.io.IOException;
@@ -23,101 +25,75 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.awaitility.Awaitility;
-
-import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
import org.junit.After;
import org.junit.Before;
-import org.junit.Ignore;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.ErrorCollector;
+import org.junit.rules.TestName;
-import org.apache.geode.LogWriter;
-import org.apache.geode.SystemFailure;
-import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.CacheListener;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionExistsException;
import org.apache.geode.cache.util.CacheListenerAdapter;
-import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.internal.cache.Conflatable;
import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.test.dunit.ThreadUtils;
+import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
import org.apache.geode.test.junit.categories.IntegrationTest;
/**
* This is a test for the APIs of a HARegionQueue and verifies that the head, tail and size counters
* are updated properly.
+ *
+ * TODO: need to rewrite a bunch of tests in HARegionQueueJUnitTest
*/
@Category({IntegrationTest.class, ClientSubscriptionTest.class})
public class HARegionQueueJUnitTest {
- /** The cache instance */
- protected InternalCache cache = null;
+ /** total number of threads doing put operations */
+ private static final int TOTAL_PUT_THREADS = 10;
- /** Logger for this test */
- protected LogWriter logger;
+ private static HARegionQueue hrqForTestSafeConflationRemoval;
+ private static List list1;
- /** The <code>RegionQueue</code> instance */
- protected HARegionQueue rq;
+ protected InternalCache cache;
+ private HARegionQueue haRegionQueue;
- /** total number of threads doing put operations */
- private static final int TOTAL_PUT_THREADS = 10;
+ @Rule
+ public DistributedRestoreSystemProperties restoreSystemProperties =
+ new DistributedRestoreSystemProperties();
- boolean expiryCalled = false;
+ @Rule
+ public ErrorCollector errorCollector = new ErrorCollector();
- volatile boolean encounteredException = false;
- boolean allowExpiryToProceed = false;
- boolean complete = false;
+ @Rule
+ public TestName testName = new TestName();
@Before
public void setUp() throws Exception {
- cache = createCache();
- logger = cache.getLogger();
- encounteredException = false;
+ this.cache = createCache();
}
@After
public void tearDown() throws Exception {
- cache.close();
- }
-
- /**
- * Creates the cache instance for the test
- */
- private InternalCache createCache() throws CacheException {
- return (InternalCache) new CacheFactory().set(MCAST_PORT, "0").create();
- }
-
- /**
- * Creates HA region-queue object
- */
- private HARegionQueue createHARegionQueue(String name)
- throws IOException, ClassNotFoundException, CacheException, InterruptedException {
- HARegionQueue regionqueue = HARegionQueue.getHARegionQueueInstance(name, cache,
- HARegionQueue.NON_BLOCKING_HA_QUEUE, false);
- return regionqueue;
- }
-
- /**
- * Creates region-queue object
- */
- private HARegionQueue createHARegionQueue(String name, HARegionQueueAttributes attrs)
- throws IOException, ClassNotFoundException, CacheException, InterruptedException {
- HARegionQueue regionqueue = HARegionQueue.getHARegionQueueInstance(name, cache, attrs,
- HARegionQueue.NON_BLOCKING_HA_QUEUE, false);
- return regionqueue;
+ this.cache.close();
+ hrqForTestSafeConflationRemoval = null;
}
/**
@@ -129,14 +105,10 @@ public class HARegionQueueJUnitTest {
*/
@Test
public void testQueuePutWithoutConflation() throws Exception {
- logger.info("HARegionQueueJUnitTest : testQueuePutWithoutConflation BEGIN");
-
- rq = createHARegionQueue("testOfferNoConflation");
+ this.haRegionQueue = createHARegionQueue(this.testName.getMethodName());
int putPerProducer = 20;
createAndRunProducers(false, false, false, putPerProducer);
- assertEquals(putPerProducer * TOTAL_PUT_THREADS, rq.size());
-
- logger.info("HARegionQueueJUnitTest : testQueuePutWithoutConflation END");
+ assertThat(this.haRegionQueue.size(), is(putPerProducer * TOTAL_PUT_THREADS));
}
/**
@@ -149,14 +121,10 @@ public class HARegionQueueJUnitTest {
*/
@Test
public void testQueuePutWithConflation() throws Exception {
- logger.info("HARegionQueueJUnitTest : testQueuePutWithConflation BEGIN");
-
- rq = createHARegionQueue("testOfferConflation");
+ this.haRegionQueue = createHARegionQueue(this.testName.getMethodName());
int putPerProducer = 20;
createAndRunProducers(true, false, true, putPerProducer);
- assertEquals(putPerProducer, rq.size());
-
- logger.info("HARegionQueueJUnitTest : testQueuePutWithConflation END");
+ assertThat(this.haRegionQueue.size(), is(putPerProducer));
}
/**
@@ -166,319 +134,150 @@ public class HARegionQueueJUnitTest {
* 3)Wait till all put-threads complete their job <br>
* 4)verify that the size of the queue is equal to the total number of puts done by one thread (as
* rest of them will be duplicates and hence will be replaced)
- *
- * TODO:Dinesh : Work on optimizing the handling of receiving duplicate events
*/
@Test
public void testQueuePutWithDuplicates() throws Exception {
- logger.info("HARegionQueueJUnitTest : testQueuePutWithDuplicates BEGIN");
-
- rq = createHARegionQueue("testQueuePutWithDuplicates");
+ this.haRegionQueue = createHARegionQueue(this.testName.getMethodName());
int putPerProducer = 20;
- // createAndRunProducers(false, true, true, putPerProducer);
- /* Suyog: Only one thread can enter DACE at a time */
createAndRunProducers(false, false, true, putPerProducer);
- assertEquals(putPerProducer * TOTAL_PUT_THREADS, rq.size());
-
- logger.info("HARegionQueueJUnitTest : testQueuePutWithDuplicates END");
- }
-
- /**
- * Creates and runs the put threads which will create the conflatable objects and add them to the
- * queue
- *
- * @param generateSameKeys - if all the producers need to put objects with same set of keys
- * (needed for conflation testing)
- * @param generateSameIds - if all the producers need to put objects with same set of ids (needed
- * for duplicates testing)
- * @param conflationEnabled - true if all producers need to put objects with conflation enabled,
- * false otherwise.
- * @param putPerProducer - number of objects offered to the queue by each producer
- * @throws Exception - thrown if any problem occurs in test execution
- */
- private void createAndRunProducers(boolean generateSameKeys, boolean generateSameIds,
- boolean conflationEnabled, int putPerProducer) throws Exception {
- Producer[] putThreads = new Producer[TOTAL_PUT_THREADS];
-
- int i = 0;
-
- // Create the put-threads, each generating same/different set of ids/keys as
- // per the parameters
- for (i = 0; i < TOTAL_PUT_THREADS; i++) {
- String keyPrefix = null;
- long startId;
- if (generateSameKeys) {
- keyPrefix = "key";
- } else {
- keyPrefix = i + "key";
- }
- if (generateSameIds) {
- startId = 1;
- } else {
- startId = i * 100000;
- }
- putThreads[i] =
- new Producer("Producer-" + i, keyPrefix, startId, putPerProducer, conflationEnabled);
- }
-
- // start the put-threads
- for (i = 0; i < TOTAL_PUT_THREADS; i++) {
- putThreads[i].start();
- }
-
- // call join on the put-threads so that this thread waits till they complete
- // before doing verfication
- for (i = 0; i < TOTAL_PUT_THREADS; i++) {
- ThreadUtils.join(putThreads[i], 30 * 1000);
- }
- assertFalse(encounteredException);
+ assertThat(this.haRegionQueue.size(), is(putPerProducer * TOTAL_PUT_THREADS));
}
/*
* Test method for 'org.apache.geode.internal.cache.ha.HARegionQueue.addDispatchedMessage(Object)'
*/
@Test
- public void testAddDispatchedMessageObject() {
- try {
- // HARegionQueue haRegionQueue = new HARegionQueue("testing", cache);
- HARegionQueue haRegionQueue = createHARegionQueue("testing");
- assertTrue(HARegionQueue.getDispatchedMessagesMapForTesting().isEmpty());
- // TODO:
-
- haRegionQueue.addDispatchedMessage(new ThreadIdentifier(new byte[1], 1), 1);
- haRegionQueue.addDispatchedMessage(new ThreadIdentifier(new byte[1], 2), 2);
+ public void testAddDispatchedMessageObject() throws Exception {
+ this.haRegionQueue = createHARegionQueue(this.testName.getMethodName());
+ assertThat(HARegionQueue.getDispatchedMessagesMapForTesting().isEmpty(), is(true));
- assertTrue(!HARegionQueue.getDispatchedMessagesMapForTesting().isEmpty());
- // HARegionQueue.getDispatchedMessagesMapForTesting().clear();
+ this.haRegionQueue.addDispatchedMessage(new ThreadIdentifier(new byte[1], 1), 1);
+ this.haRegionQueue.addDispatchedMessage(new ThreadIdentifier(new byte[1], 2), 2);
- } catch (Exception e) {
- throw new AssertionError("Test encountered an exception due to ", e);
- }
+ assertThat(!HARegionQueue.getDispatchedMessagesMapForTesting().isEmpty(), is(true));
}
/**
* tests the blocking peek functionality of BlockingHARegionQueue
*/
@Test
- public void testBlockQueue() {
- exceptionInThread = false;
- testFailed = false;
- try {
- final HARegionQueue bQ = HARegionQueue.getHARegionQueueInstance("testing", cache,
- HARegionQueue.BLOCKING_HA_QUEUE, false);
- Thread[] threads = new Thread[10];
- final CyclicBarrier barrier = new CyclicBarrier(threads.length + 1);
- for (int i = 0; i < threads.length; i++) {
- threads[i] = new Thread() {
- public void run() {
- try {
- barrier.await();
- long startTime = System.currentTimeMillis();
- Object obj = bQ.peek();
- if (obj == null) {
- testFailed = true;
- message.append(
- " Failed : failed since object was null and was not expected to be null \n");
- }
- long totalTime = System.currentTimeMillis() - startTime;
+ public void testBlockQueue() throws Exception {
+ HARegionQueue regionQueue = HARegionQueue.getHARegionQueueInstance(
+ this.testName.getMethodName(), this.cache, HARegionQueue.BLOCKING_HA_QUEUE, false);
+ Thread[] threads = new Thread[10];
+ int threadsLength = threads.length;
+ CyclicBarrier barrier = new CyclicBarrier(threadsLength + 1);
+
+ for (int i = 0; i < threadsLength; i++) {
+ threads[i] = new Thread() {
+ @Override
+ public void run() {
+ try {
+ barrier.await();
+ long startTime = System.currentTimeMillis();
+ Object obj = regionQueue.peek();
+ if (obj == null) {
+ errorCollector.addError(new AssertionError(
+ "Failed : failed since object was null and was not expected to be null"));
+ }
+ long totalTime = System.currentTimeMillis() - startTime;
- if (totalTime < 2000) {
- testFailed = true;
- message
- .append(" Failed : Expected time to be greater than 2000 but it is not so ");
- }
- } catch (Exception e) {
- exceptionInThread = true;
- exception = e;
+ if (totalTime < 2000) {
+ errorCollector.addError(new AssertionError(
+ " Failed : Expected time to be greater than 2000 but it is not so "));
}
+ } catch (Exception e) {
+ errorCollector.addError(e);
}
- };
-
- }
-
- for (int k = 0; k < threads.length; k++) {
- threads[k].start();
- }
- barrier.await();
- Thread.sleep(5000);
-
- EventID id = new EventID(new byte[] {1}, 1, 1);
- bQ.put(new ConflatableObject("key", "value", id, false, "testing"));
-
- long startTime = System.currentTimeMillis();
- for (int k = 0; k < threads.length; k++) {
- ThreadUtils.join(threads[k], 60 * 1000);
- }
-
- long totalTime = System.currentTimeMillis() - startTime;
-
- if (totalTime >= 60000) {
- fail(" Test taken too long ");
- }
-
- if (testFailed) {
- fail(" test failed due to " + message);
- }
-
- } catch (Exception e) {
- throw new AssertionError(" Test failed due to ", e);
+ }
+ };
}
- }
-
- private static volatile int counter = 0;
-
- protected boolean exceptionInThread = false;
-
- protected boolean testFailed = false;
-
- protected StringBuffer message = new StringBuffer();
-
- protected Exception exception = null;
- private synchronized int getCounter() {
- return ++counter;
- }
-
- /**
- * Thread to perform PUTs into the queue
- */
- class Producer extends Thread {
- /** total number of puts by this thread */
- long totalPuts = 0;
-
- /** sleep between successive puts */
- long sleeptime = 10;
-
- /** prefix to keys of all objects put by this thread */
- String keyPrefix;
+ for (Thread thread1 : threads) {
+ thread1.start();
+ }
- /** startingId for sequence-ids of all objects put by this thread */
- long startingId;
+ barrier.await();
- /** name of this producer thread */
- String producerName;
+ Thread.sleep(5000);
- /**
- * boolean to indicate whether this thread should create conflation enabled entries
- */
- boolean createConflatables;
+ EventID id = new EventID(new byte[] {1}, 1, 1);
+ regionQueue
+ .put(new ConflatableObject("key", "value", id, false, this.testName.getMethodName()));
- /**
- * Constructor
- *
- * @param name - name for this thread
- * @param keyPrefix - prefix to keys of all objects put by this thread
- * @param startingId - startingId for sequence-ids of all objects put by this thread
- * @param totalPuts total number of puts by this thread
- * @param createConflatableEvents - boolean to indicate whether this thread should create
- * conflation enabled entries
- */
- Producer(String name, String keyPrefix, long startingId, long totalPuts,
- boolean createConflatableEvents) {
- super(name);
- this.producerName = name;
- this.keyPrefix = keyPrefix;
- this.startingId = startingId;
- this.totalPuts = totalPuts;
- this.createConflatables = createConflatableEvents;
- setDaemon(true);
+ long startTime = System.currentTimeMillis();
+ for (Thread thread : threads) {
+ ThreadUtils.join(thread, 60 * 1000);
}
- /** Create Conflatable objects and put them into the Queue. */
- @Override
- public void run() {
- if (producerName == null) {
- producerName = Thread.currentThread().getName();
- }
- for (long i = 0; i < totalPuts; i++) {
- String REGION_NAME = "test";
- try {
- ConflatableObject event = new ConflatableObject(keyPrefix + i, "val" + i,
- new EventID(new byte[] {1}, startingId, startingId + i), createConflatables,
- REGION_NAME);
-
- logger.fine("putting for key = " + keyPrefix + i);
- rq.put(event);
- Thread.sleep(sleeptime);
- } catch (VirtualMachineError e) {
- SystemFailure.initiateFailure(e);
- throw e;
- } catch (Throwable e) {
- logger.severe("Exception while running Producer;continue running.", e);
- encounteredException = true;
- break;
- }
- }
- logger.info(producerName + " : Puts completed");
+ long totalTime = System.currentTimeMillis() - startTime;
+
+ if (totalTime >= 60000) {
+ fail(" Test taken too long ");
}
}
/**
- * tests whether expiry of entry in the regin queue occurs as expected
+ * tests whether expiry of entry in the region queue occurs as expected
*/
@Test
- public void testExpiryPositive()
- throws InterruptedException, IOException, ClassNotFoundException {
+ public void testExpiryPositive() throws Exception {
HARegionQueueAttributes haa = new HARegionQueueAttributes();
haa.setExpiryTime(1);
- HARegionQueue regionqueue = createHARegionQueue("testing", haa);
+
+ HARegionQueue regionQueue = createHARegionQueue(this.testName.getMethodName(), haa);
long start = System.currentTimeMillis();
- regionqueue.put(
- new ConflatableObject("key", "value", new EventID(new byte[] {1}, 1, 1), true, "testing"));
- Map map = (Map) regionqueue.getConflationMapForTesting().get("testing");
+
+ regionQueue.put(new ConflatableObject("key", "value", new EventID(new byte[] {1}, 1, 1), true,
+ this.testName.getMethodName()));
+
+ Map map = (Map) regionQueue.getConflationMapForTesting().get(this.testName.getMethodName());
waitAtLeast(1000, start, () -> {
- assertEquals(Collections.EMPTY_MAP, map);
- assertEquals(Collections.EMPTY_SET, regionqueue.getRegion().keys());
+ assertThat(map, is(Collections.emptyMap()));
+ assertThat(regionQueue.getRegion().keys(), is(Collections.emptySet()));
});
}
/**
- * Wait until a given runnable stops throwing exceptions. It should take at least
- * minimumElapsedTime after the supplied start time to happen.
- *
- * This is useful for validating that an entry doesn't expire until a certain amount of time has
- * passed
- */
- protected void waitAtLeast(final int minimumElapsedTIme, final long start,
- final Runnable runnable) {
- Awaitility.await().atMost(1, TimeUnit.MINUTES).until(runnable);
- long elapsed = System.currentTimeMillis() - start;
- assertTrue(elapsed >= minimumElapsedTIme);
- }
-
- /**
* tests whether expiry of a conflated entry in the region queue occurs as expected
*/
@Test
- public void testExpiryPositiveWithConflation()
- throws InterruptedException, IOException, ClassNotFoundException {
+ public void testExpiryPositiveWithConflation() throws Exception {
HARegionQueueAttributes haa = new HARegionQueueAttributes();
haa.setExpiryTime(1);
- HARegionQueue regionqueue = createHARegionQueue("testing", haa);
+
+ HARegionQueue regionQueue = createHARegionQueue(this.testName.getMethodName(), haa);
long start = System.currentTimeMillis();
- regionqueue.put(
- new ConflatableObject("key", "value", new EventID(new byte[] {1}, 1, 1), true, "testing"));
- regionqueue.put(new ConflatableObject("key", "newValue", new EventID(new byte[] {1}, 1, 2),
- true, "testing"));
- assertTrue(
+
+ regionQueue.put(new ConflatableObject("key", "value", new EventID(new byte[] {1}, 1, 1), true,
+ this.testName.getMethodName()));
+
+ regionQueue.put(new ConflatableObject("key", "newValue", new EventID(new byte[] {1}, 1, 2),
+ true, this.testName.getMethodName()));
+
+ assertThat(
" Expected region size not to be zero since expiry time has not been exceeded but it is not so ",
- !(regionqueue.size() == 0));
- assertTrue(
+ !regionQueue.isEmpty(), is(true));
+ assertThat(
" Expected the available id's size not to be zero since expiry time has not been exceeded but it is not so ",
- !(regionqueue.getAvalaibleIds().size() == 0));
- assertTrue(
+ !regionQueue.getAvalaibleIds().isEmpty(), is(true));
+ assertThat(
" Expected conflation map size not to be zero since expiry time has not been exceeded but it is not so "
- + ((((Map) (regionqueue.getConflationMapForTesting().get("testing"))).get("key"))),
- !((((Map) (regionqueue.getConflationMapForTesting().get("testing"))).get("key")) == null));
- assertTrue(
+ + ((Map) regionQueue.getConflationMapForTesting().get(this.testName.getMethodName()))
+ .get("key"),
+ ((Map) regionQueue.getConflationMapForTesting().get(this.testName.getMethodName()))
+ .get("key"),
+ not(sameInstance(null)));
+ assertThat(
" Expected eventID map size not to be zero since expiry time has not been exceeded but it is not so ",
- !(regionqueue.getEventsMapForTesting().size() == 0));
+ !regionQueue.getEventsMapForTesting().isEmpty(), is(true));
waitAtLeast(1000, start, () -> {
- assertEquals(Collections.EMPTY_SET, regionqueue.getRegion().keys());
- assertEquals(Collections.EMPTY_SET, regionqueue.getAvalaibleIds());
- assertEquals(Collections.EMPTY_MAP, regionqueue.getConflationMapForTesting().get("testing"));
- assertEquals(Collections.EMPTY_MAP, regionqueue.getEventsMapForTesting());
+ assertThat(regionQueue.getRegion().keys(), is(Collections.emptySet()));
+ assertThat(regionQueue.getAvalaibleIds(), is(Collections.emptySet()));
+ assertThat(regionQueue.getConflationMapForTesting().get(this.testName.getMethodName()),
+ is(Collections.emptyMap()));
+ assertThat(regionQueue.getEventsMapForTesting(), is(Collections.emptyMap()));
});
}
@@ -486,38 +285,37 @@ public class HARegionQueueJUnitTest {
* tests a ThreadId not being expired if it was updated
*/
@Test
- public void testNoExpiryOfThreadId() {
- try {
- HARegionQueueAttributes haa = new HARegionQueueAttributes();
- haa.setExpiryTime(45);
- // RegionQueue regionqueue = new HARegionQueue("testing", cache, haa);
- HARegionQueue regionqueue = createHARegionQueue("testing", haa);
- EventID ev1 = new EventID(new byte[] {1}, 1, 1);
- EventID ev2 = new EventID(new byte[] {1}, 1, 2);
- Conflatable cf1 = new ConflatableObject("key", "value", ev1, true, "testing");
- Conflatable cf2 = new ConflatableObject("key", "value2", ev2, true, "testing");
- regionqueue.put(cf1);
- final long tailKey = regionqueue.tailKey.get();
- regionqueue.put(cf2);
- // Invalidate will trigger the expiration of the entry
- // See HARegionQueue.createCacheListenerForHARegion
- regionqueue.getRegion().invalidate(tailKey);
- assertTrue(
- " Expected region size not to be zero since expiry time has not been exceeded but it is not so ",
- !(regionqueue.size() == 0));
- assertTrue(" Expected the available id's size not to have counter 1 but it has ",
- !(regionqueue.getAvalaibleIds().contains(new Long(1))));
- assertTrue(" Expected the available id's size to have counter 2 but it does not have ",
- (regionqueue.getAvalaibleIds().contains(new Long(2))));
- assertTrue(" Expected eventID map not to have the first event, but it has",
- !(regionqueue.getCurrentCounterSet(ev1).contains(new Long(1))));
- assertTrue(" Expected eventID map to have the second event, but it does not",
- (regionqueue.getCurrentCounterSet(ev2).contains(new Long(2))));
- }
+ public void testNoExpiryOfThreadId() throws Exception {
+ HARegionQueueAttributes haa = new HARegionQueueAttributes();
+ haa.setExpiryTime(45);
- catch (Exception e) {
- throw new AssertionError("test failed due to ", e);
- }
+ HARegionQueue regionQueue = createHARegionQueue(this.testName.getMethodName(), haa);
+ EventID ev1 = new EventID(new byte[] {1}, 1, 1);
+ EventID ev2 = new EventID(new byte[] {1}, 1, 2);
+ Conflatable cf1 =
+ new ConflatableObject("key", "value", ev1, true, this.testName.getMethodName());
+ Conflatable cf2 =
+ new ConflatableObject("key", "value2", ev2, true, this.testName.getMethodName());
+
+ regionQueue.put(cf1);
+ long tailKey = regionQueue.tailKey.get();
+ regionQueue.put(cf2);
+
+ // Invalidate will trigger the expiration of the entry
+ // See HARegionQueue.createCacheListenerForHARegion
+ regionQueue.getRegion().invalidate(tailKey);
+
+ assertThat(
+ " Expected region size not to be zero since expiry time has not been exceeded but it is not so ",
+ !regionQueue.isEmpty(), is(true));
+ assertThat(" Expected the available id's size not to have counter 1 but it has ",
+ !regionQueue.getAvalaibleIds().contains(1L), is(true));
+ assertThat(" Expected the available id's size to have counter 2 but it does not have ",
+ regionQueue.getAvalaibleIds().contains(2L), is(true));
+ assertThat(" Expected eventID map not to have the first event, but it has",
+ !regionQueue.getCurrentCounterSet(ev1).contains(1L), is(true));
+ assertThat(" Expected eventID map to have the second event, but it does not",
+ regionQueue.getCurrentCounterSet(ev2).contains(2L), is(true));
}
/**
@@ -525,66 +323,64 @@ public class HARegionQueueJUnitTest {
* being put in the queue
*/
@Test
- public void testQRMComingBeforeLocalPut() {
- try {
- // RegionQueue regionqueue = new HARegionQueue("testing", cache);
- HARegionQueue regionqueue = createHARegionQueue("testing");
- EventID id = new EventID(new byte[] {1}, 1, 1);
- regionqueue.removeDispatchedEvents(id);
- regionqueue.put(new ConflatableObject("key", "value", id, true, "testing"));
- assertTrue(" Expected key to be null since QRM for the message id had already arrived ",
- !regionqueue.getRegion().containsKey(new Long(1)));
- } catch (Exception e) {
- throw new AssertionError("test failed due to ", e);
- }
+ public void testQRMComingBeforeLocalPut() throws Exception {
+ HARegionQueue regionQueue = createHARegionQueue(this.testName.getMethodName());
+ EventID id = new EventID(new byte[] {1}, 1, 1);
+
+ regionQueue.removeDispatchedEvents(id);
+ regionQueue.put(new ConflatableObject("key", "value", id, true, this.testName.getMethodName()));
+
+ assertThat(" Expected key to be null since QRM for the message id had already arrived ",
+ !regionQueue.getRegion().containsKey(1L), is(true));
}
/**
* test verifies correct expiry of ThreadIdentifier in the HARQ if no corresponding put comes
*/
@Test
- public void testOnlyQRMComing() throws InterruptedException, IOException, ClassNotFoundException {
+ public void testOnlyQRMComing() throws Exception {
HARegionQueueAttributes harqAttr = new HARegionQueueAttributes();
harqAttr.setExpiryTime(1);
- // RegionQueue regionqueue = new HARegionQueue("testing", cache, harqAttr);
- HARegionQueue regionqueue = createHARegionQueue("testing", harqAttr);
+
+ HARegionQueue regionQueue = createHARegionQueue(this.testName.getMethodName(), harqAttr);
EventID id = new EventID(new byte[] {1}, 1, 1);
long start = System.currentTimeMillis();
- regionqueue.removeDispatchedEvents(id);
- assertTrue(" Expected testingID to be present since only QRM achieved ",
- regionqueue.getRegion().containsKey(new ThreadIdentifier(new byte[] {1}, 1)));
+
+ regionQueue.removeDispatchedEvents(id);
+
+ assertThat(" Expected testingID to be present since only QRM achieved ",
+ regionQueue.getRegion().containsKey(new ThreadIdentifier(new byte[] {1}, 1)), is(true));
+
waitAtLeast(1000, start,
- () -> assertTrue(
+ () -> assertThat(
" Expected testingID not to be present since it should have expired after 2.5 seconds",
- !regionqueue.getRegion().containsKey(new ThreadIdentifier(new byte[] {1}, 1))));
+ !regionQueue.getRegion().containsKey(new ThreadIdentifier(new byte[] {1}, 1)),
+ is(true)));
}
/**
* test all relevant data structures are updated on a local put
*/
@Test
- public void testPutPath() {
- try {
- HARegionQueue regionqueue = createHARegionQueue("testing");
- Conflatable cf =
- new ConflatableObject("key", "value", new EventID(new byte[] {1}, 1, 1), true, "testing");
- regionqueue.put(cf);
- assertTrue(" Expected region peek to return cf but it is not so ",
- (regionqueue.peek().equals(cf)));
- assertTrue(
- " Expected the available id's size not to be zero since expiry time has not been exceeded but it is not so ",
- !(regionqueue.getAvalaibleIds().size() == 0));
- assertTrue(
- " Expected conflation map to have entry for this key since expiry time has not been exceeded but it is not so ",
- ((((Map) (regionqueue.getConflationMapForTesting().get("testing"))).get("key"))
- .equals(new Long(1))));
- assertTrue(
- " Expected eventID map size not to be zero since expiry time has not been exceeded but it is not so ",
- !(regionqueue.getEventsMapForTesting().size() == 0));
+ public void testPutPath() throws Exception {
+ HARegionQueue regionQueue = createHARegionQueue(this.testName.getMethodName());
+ Conflatable cf = new ConflatableObject("key", "value", new EventID(new byte[] {1}, 1, 1), true,
+ this.testName.getMethodName());
- } catch (Exception e) {
- throw new AssertionError("Exception occurred in test due to ", e);
- }
+ regionQueue.put(cf);
+
+ assertThat(" Expected region peek to return cf but it is not so ", regionQueue.peek(), is(cf));
+ assertThat(
+ " Expected the available id's size not to be zero since expiry time has not been exceeded but it is not so ",
+ !regionQueue.getAvalaibleIds().isEmpty(), is(true));
+ assertThat(
+ " Expected conflation map to have entry for this key since expiry time has not been exceeded but it is not so ",
+ ((Map) regionQueue.getConflationMapForTesting().get(this.testName.getMethodName()))
+ .get("key"),
+ is(1L));
+ assertThat(
+ " Expected eventID map size not to be zero since expiry time has not been exceeded but it is not so ",
+ !regionQueue.getEventsMapForTesting().isEmpty(), is(true));
}
/**
@@ -592,58 +388,64 @@ public class HARegionQueueJUnitTest {
* there - verify the next five entries and their relevant data is present
*/
@Test
- public void testQRMDispatch() {
- try {
- HARegionQueue regionqueue = createHARegionQueue("testing");
- Conflatable[] cf = new Conflatable[10];
- // put 10 conflatable objects
- for (int i = 0; i < 10; i++) {
- cf[i] = new ConflatableObject("key" + i, "value", new EventID(new byte[] {1}, 1, i), true,
- "testing");
- regionqueue.put(cf[i]);
- }
- // remove the first 5 by giving the right sequence id
- regionqueue.removeDispatchedEvents(new EventID(new byte[] {1}, 1, 4));
- // verify 1-5 not in region
- for (long i = 1; i < 6; i++) {
- assertTrue(!regionqueue.getRegion().containsKey(new Long(i)));
- }
- // verify 6-10 still in region queue
- for (long i = 6; i < 11; i++) {
- assertTrue(regionqueue.getRegion().containsKey(new Long(i)));
- }
- // verify 1-5 not in conflation map
- for (long i = 0; i < 5; i++) {
- assertTrue(!((Map) regionqueue.getConflationMapForTesting().get("testing"))
- .containsKey("key" + i));
- }
- // verify 6-10 in conflation map
- for (long i = 5; i < 10; i++) {
- assertTrue(
- ((Map) regionqueue.getConflationMapForTesting().get("testing")).containsKey("key" + i));
- }
+ public void testQRMDispatch() throws Exception {
+ HARegionQueue regionQueue = createHARegionQueue(this.testName.getMethodName());
+ Conflatable[] cf = new Conflatable[10];
+
+ // put 10 conflatable objects
+ for (int i = 0; i < 10; i++) {
+ cf[i] = new ConflatableObject("key" + i, "value", new EventID(new byte[] {1}, 1, i), true,
+ this.testName.getMethodName());
+ regionQueue.put(cf[i]);
+ }
- EventID eid = new EventID(new byte[] {1}, 1, 6);
- // verify 1-5 not in eventMap
- for (long i = 1; i < 6; i++) {
- assertTrue(!regionqueue.getCurrentCounterSet(eid).contains(new Long(i)));
- }
- // verify 6-10 in event Map
- for (long i = 6; i < 11; i++) {
- assertTrue(regionqueue.getCurrentCounterSet(eid).contains(new Long(i)));
- }
+ // remove the first 5 by giving the right sequence id
+ regionQueue.removeDispatchedEvents(new EventID(new byte[] {1}, 1, 4));
- // verify 1-5 not in available Id's map
- for (long i = 1; i < 6; i++) {
- assertTrue(!regionqueue.getAvalaibleIds().contains(new Long(i)));
- }
+ // verify 1-5 not in region
+ for (int i = 1; i < 6; i++) {
+ assertThat(!regionQueue.getRegion().containsKey((long) i), is(true));
+ }
- // verify 6-10 in available id's map
- for (long i = 6; i < 11; i++) {
- assertTrue(regionqueue.getAvalaibleIds().contains(new Long(i)));
- }
- } catch (Exception e) {
- throw new AssertionError("Exception occurred in test due to ", e);
+ // verify 6-10 still in region queue
+ for (int i = 6; i < 11; i++) {
+ assertThat(regionQueue.getRegion().containsKey((long) i), is(true));
+ }
+
+ // verify 1-5 not in conflation map
+ for (int i = 0; i < 5; i++) {
+ assertThat(
+ !((Map) regionQueue.getConflationMapForTesting().get(this.testName.getMethodName()))
+ .containsKey("key" + i),
+ is(true));
+ }
+
+ // verify 6-10 in conflation map
+ for (int i = 5; i < 10; i++) {
+ assertThat(((Map) regionQueue.getConflationMapForTesting().get(this.testName.getMethodName()))
+ .containsKey("key" + i), is(true));
+ }
+
+ EventID eid = new EventID(new byte[] {1}, 1, 6);
+
+ // verify 1-5 not in eventMap
+ for (int i = 1; i < 6; i++) {
+ assertThat(!regionQueue.getCurrentCounterSet(eid).contains((long) i), is(true));
+ }
+
+ // verify 6-10 in event Map
+ for (int i = 6; i < 11; i++) {
+ assertThat(regionQueue.getCurrentCounterSet(eid).contains((long) i), is(true));
+ }
+
+ // verify 1-5 not in available Id's map
+ for (int i = 1; i < 6; i++) {
+ assertThat(!regionQueue.getAvalaibleIds().contains((long) i), is(true));
+ }
+
+ // verify 6-10 in available id's map
+ for (int i = 6; i < 11; i++) {
+ assertThat(regionQueue.getAvalaibleIds().contains((long) i), is(true));
}
}
@@ -652,68 +454,74 @@ public class HARegionQueueJUnitTest {
* 1-7 not there - verify data for 8-10 is there
*/
@Test
- public void testQRMBeforePut() {
- try {
- HARegionQueue regionqueue = createHARegionQueue("testing");
+ public void testQRMBeforePut() throws Exception {
+ HARegionQueue regionQueue = createHARegionQueue(this.testName.getMethodName());
- EventID[] ids = new EventID[10];
+ EventID[] ids = new EventID[10];
- for (int i = 0; i < 10; i++) {
- ids[i] = new EventID(new byte[] {1}, 1, i);
- }
+ for (int i = 0; i < 10; i++) {
+ ids[i] = new EventID(new byte[] {1}, 1, i);
+ }
- // first get the qrm message for the seventh id
- regionqueue.removeDispatchedEvents(ids[6]);
- Conflatable[] cf = new Conflatable[10];
- // put 10 conflatable objects
- for (int i = 0; i < 10; i++) {
- cf[i] = new ConflatableObject("key" + i, "value", ids[i], true, "testing");
- regionqueue.put(cf[i]);
- }
+ // first get the qrm message for the seventh id
+ regionQueue.removeDispatchedEvents(ids[6]);
+ Conflatable[] cf = new Conflatable[10];
- // verify 1-7 not in region
- Set values = (Set) regionqueue.getRegion().values();
- for (int i = 0; i < 7; i++) {
- System.out.println(i);
- assertTrue(!values.contains(cf[i]));
- }
- // verify 8-10 still in region queue
- for (int i = 7; i < 10; i++) {
- System.out.println(i);
- assertTrue(values.contains(cf[i]));
- }
- // verify 1-8 not in conflation map
- for (long i = 0; i < 7; i++) {
- assertTrue(!((Map) regionqueue.getConflationMapForTesting().get("testing"))
- .containsKey("key" + i));
- }
- // verify 8-10 in conflation map
- for (long i = 7; i < 10; i++) {
- assertTrue(
- ((Map) regionqueue.getConflationMapForTesting().get("testing")).containsKey("key" + i));
- }
+ // put 10 conflatable objects
+ for (int i = 0; i < 10; i++) {
+ cf[i] =
+ new ConflatableObject("key" + i, "value", ids[i], true, this.testName.getMethodName());
+ regionQueue.put(cf[i]);
+ }
- EventID eid = new EventID(new byte[] {1}, 1, 6);
- // verify 1-7 not in eventMap
- for (long i = 4; i < 11; i++) {
- assertTrue(!regionqueue.getCurrentCounterSet(eid).contains(new Long(i)));
- }
- // verify 8-10 in event Map
- for (long i = 1; i < 4; i++) {
- assertTrue(regionqueue.getCurrentCounterSet(eid).contains(new Long(i)));
- }
+ // verify 1-7 not in region
+ Set values = (Set) regionQueue.getRegion().values();
- // verify 1-7 not in available Id's map
- for (long i = 4; i < 11; i++) {
- assertTrue(!regionqueue.getAvalaibleIds().contains(new Long(i)));
- }
+ for (int i = 0; i < 7; i++) {
+ System.out.println(i);
+ assertThat(!values.contains(cf[i]), is(true));
+ }
- // verify 8-10 in available id's map
- for (long i = 1; i < 4; i++) {
- assertTrue(regionqueue.getAvalaibleIds().contains(new Long(i)));
- }
- } catch (Exception e) {
- throw new AssertionError("Exception occurred in test due to ", e);
+ // verify 8-10 still in region queue
+ for (int i = 7; i < 10; i++) {
+ System.out.println(i);
+ assertThat(values.contains(cf[i]), is(true));
+ }
+
+ // verify 1-8 not in conflation map
+ for (int i = 0; i < 7; i++) {
+ assertThat(
+ !((Map) regionQueue.getConflationMapForTesting().get(this.testName.getMethodName()))
+ .containsKey("key" + i),
+ is(true));
+ }
+
+ // verify 8-10 in conflation map
+ for (int i = 7; i < 10; i++) {
+ assertThat(((Map) regionQueue.getConflationMapForTesting().get(this.testName.getMethodName()))
+ .containsKey("key" + i), is(true));
+ }
+
+ EventID eid = new EventID(new byte[] {1}, 1, 6);
+
+ // verify 1-7 not in eventMap
+ for (int i = 4; i < 11; i++) {
+ assertThat(!regionQueue.getCurrentCounterSet(eid).contains((long) i), is(true));
+ }
+
+ // verify 8-10 in event Map
+ for (int i = 1; i < 4; i++) {
+ assertThat(regionQueue.getCurrentCounterSet(eid).contains((long) i), is(true));
+ }
+
+ // verify 1-7 not in available Id's map
+ for (int i = 4; i < 11; i++) {
+ assertThat(!regionQueue.getAvalaibleIds().contains((long) i), is(true));
+ }
+
+ // verify 8-10 in available id's map
+ for (int i = 1; i < 4; i++) {
+ assertThat(regionQueue.getAvalaibleIds().contains((long) i), is(true));
}
}
@@ -721,33 +529,33 @@ public class HARegionQueueJUnitTest {
* test to verify conflation happens as expected
*/
@Test
- public void testConflation() {
- try {
- HARegionQueue regionqueue = createHARegionQueue("testing");
- EventID ev1 = new EventID(new byte[] {1}, 1, 1);
- EventID ev2 = new EventID(new byte[] {1}, 2, 2);
- Conflatable cf1 = new ConflatableObject("key", "value", ev1, true, "testing");
- Conflatable cf2 = new ConflatableObject("key", "value2", ev2, true, "testing");
- regionqueue.put(cf1);
- Map conflationMap = regionqueue.getConflationMapForTesting();
- assertTrue(((Map) (conflationMap.get("testing"))).get("key").equals(new Long(1)));
- regionqueue.put(cf2);
- // verify the conflation map has recorded the new key
- assertTrue(((Map) (conflationMap.get("testing"))).get("key").equals(new Long(2)));
- // the old key should not be present
- assertTrue(!regionqueue.getRegion().containsKey(new Long(1)));
- // available ids should not contain the old id (the old position)
- assertTrue(!regionqueue.getAvalaibleIds().contains(new Long(1)));
- // available id should have the new id (the new position)
- assertTrue(regionqueue.getAvalaibleIds().contains(new Long(2)));
- // events map should not contain the old position
- assertTrue(regionqueue.getCurrentCounterSet(ev1).isEmpty());
- // events map should contain the new position
- assertTrue(regionqueue.getCurrentCounterSet(ev2).contains(new Long(2)));
-
- } catch (Exception e) {
- throw new AssertionError("Exception occurred in test due to ", e);
- }
+ public void testConflation() throws Exception {
+ HARegionQueue regionQueue = createHARegionQueue(this.testName.getMethodName());
+ EventID ev1 = new EventID(new byte[] {1}, 1, 1);
+ EventID ev2 = new EventID(new byte[] {1}, 2, 2);
+ Conflatable cf1 =
+ new ConflatableObject("key", "value", ev1, true, this.testName.getMethodName());
+ Conflatable cf2 =
+ new ConflatableObject("key", "value2", ev2, true, this.testName.getMethodName());
+ regionQueue.put(cf1);
+
+ Map conflationMap = regionQueue.getConflationMapForTesting();
+ assertThat(((Map) conflationMap.get(this.testName.getMethodName())).get("key"), is(1L));
+
+ regionQueue.put(cf2);
+
+ // verify the conflation map has recorded the new key
+ assertThat(((Map) conflationMap.get(this.testName.getMethodName())).get("key"), is(2L));
+ // the old key should not be present
+ assertThat(!regionQueue.getRegion().containsKey(1L), is(true));
+ // available ids should not contain the old id (the old position)
+ assertThat(!regionQueue.getAvalaibleIds().contains(1L), is(true));
+ // available id should have the new id (the new position)
+ assertThat(regionQueue.getAvalaibleIds().contains(2L), is(true));
+ // events map should not contain the old position
+ assertThat(regionQueue.getCurrentCounterSet(ev1).isEmpty(), is(true));
+ // events map should contain the new position
+ assertThat(regionQueue.getCurrentCounterSet(ev2).contains(2L), is(true));
}
/**
@@ -755,97 +563,58 @@ public class HARegionQueueJUnitTest {
* events which are of ID greater than that contained in QRM should stay
*/
@Test
- public void testQRM() {
- try {
- RegionQueue regionqueue = createHARegionQueue("testing");
- for (int i = 0; i < 10; ++i) {
- regionqueue.put(new ConflatableObject("key" + (i + 1), "value",
- new EventID(new byte[] {1}, 1, i + 1), true, "testing"));
- }
- EventID qrmID = new EventID(new byte[] {1}, 1, 5);
- ((HARegionQueue) regionqueue).removeDispatchedEvents(qrmID);
- Map conflationMap = ((HARegionQueue) regionqueue).getConflationMapForTesting();
- assertTrue(((Map) (conflationMap.get("testing"))).size() == 5);
-
- Set availableIDs = ((HARegionQueue) regionqueue).getAvalaibleIds();
- Set counters = ((HARegionQueue) regionqueue).getCurrentCounterSet(qrmID);
- assertTrue(availableIDs.size() == 5);
- assertTrue(counters.size() == 5);
- for (int i = 5; i < 10; ++i) {
- assertTrue(((Map) (conflationMap.get("testing"))).containsKey("key" + (i + 1)));
- assertTrue(availableIDs.contains(new Long((i + 1))));
- assertTrue(counters.contains(new Long((i + 1))));
- }
- Region rgn = ((HARegionQueue) regionqueue).getRegion();
- assertTrue(rgn.keySet().size() == 6);
+ public void testQRM() throws Exception {
+ RegionQueue regionqueue = createHARegionQueue(this.testName.getMethodName());
- } catch (Exception e) {
- throw new AssertionError("Exception occurred in test due to ", e);
+ for (int i = 0; i < 10; ++i) {
+ regionqueue.put(new ConflatableObject("key" + (i + 1), "value",
+ new EventID(new byte[] {1}, 1, i + 1), true, this.testName.getMethodName()));
}
- }
- protected static HARegionQueue hrqFortestSafeConflationRemoval;
+ EventID qrmID = new EventID(new byte[] {1}, 1, 5);
+ ((HARegionQueue) regionqueue).removeDispatchedEvents(qrmID);
+ Map conflationMap = ((HARegionQueue) regionqueue).getConflationMapForTesting();
+ assertThat(((Map) conflationMap.get(this.testName.getMethodName())).size(), is(5));
- /**
- * This test tests safe removal from the conflation map. i.e operations should only remove old
- * values and not the latest value
- */
- @Test
- public void testSafeConflationRemoval() {
- try {
- hrqFortestSafeConflationRemoval = new HARQTestClass("testSafeConflationRemoval",
+ Set availableIDs = ((HARegionQueue) regionqueue).getAvalaibleIds();
+ Set counters = ((HARegionQueue) regionqueue).getCurrentCounterSet(qrmID);
- cache, this);
- Conflatable cf1 = new ConflatableObject("key1", "value", new EventID(new byte[] {1}, 1, 1),
- true, "testSafeConflationRemoval");
- hrqFortestSafeConflationRemoval.put(cf1);
- hrqFortestSafeConflationRemoval.removeDispatchedEvents(new EventID(new byte[] {1}, 1, 1));
- Map map =
-
- (Map) hrqFortestSafeConflationRemoval.getConflationMapForTesting()
- .get("testSafeConflationRemoval");
- assertTrue(
- "Expected the counter to be 2 since it should not have been deleted but it is not so ",
- map.get("key1").equals(new Long(2)));
- hrqFortestSafeConflationRemoval = null;
- } catch (Exception e) {
- throw new AssertionError("Test failed due to ", e);
+ assertThat(availableIDs.size(), is(5));
+ assertThat(counters.size(), is(5));
+
+ for (int i = 5; i < 10; ++i) {
+ assertThat(
+ ((Map) (conflationMap.get(this.testName.getMethodName()))).containsKey("key" + (i + 1)),
+ is(true));
+ assertThat(availableIDs.contains((long) (i + 1)), is(true));
+ assertThat(counters.contains((long) (i + 1)), is(true));
}
+
+ Region rgn = ((HARegionQueue) regionqueue).getRegion();
+ assertThat(rgn.keySet().size(), is(6));
}
/**
- * Extends HARegionQueue for testing purposes. used by testSafeConflationRemoval
+ * This test tests safe removal from the conflation map. i.e operations should only remove old
+ * values and not the latest value
*/
- static class HARQTestClass extends HARegionQueue.TestOnlyHARegionQueue {
+ @Test
+ public void testSafeConflationRemoval() throws Exception {
+ hrqForTestSafeConflationRemoval = new HARQTestClass("testSafeConflationRemoval", this.cache);
+ Conflatable cf1 = new ConflatableObject("key1", "value", new EventID(new byte[] {1}, 1, 1),
+ true, "testSafeConflationRemoval");
- public HARQTestClass(String REGION_NAME, InternalCache cache, HARegionQueueJUnitTest test)
- throws IOException, ClassNotFoundException, CacheException, InterruptedException {
- super(REGION_NAME, cache);
- }
+ hrqForTestSafeConflationRemoval.put(cf1);
+ hrqForTestSafeConflationRemoval.removeDispatchedEvents(new EventID(new byte[] {1}, 1, 1));
- ConcurrentMap createConcurrentMap() {
- return new ConcHashMap();
- }
- }
+ Map map = (Map) hrqForTestSafeConflationRemoval.getConflationMapForTesting()
+ .get("testSafeConflationRemoval");
- /**
- * Used to override the remove method for testSafeConflationRemoval
- */
- static class ConcHashMap extends ConcurrentHashMap implements ConcurrentMap {
- public boolean remove(Object arg0, Object arg1) {
- Conflatable cf2 = new ConflatableObject("key1", "value2", new EventID(new byte[] {1}, 1, 2),
- true, "testSafeConflationRemoval");
- try {
- hrqFortestSafeConflationRemoval.put(cf2);
- } catch (Exception e) {
- throw new AssertionError("Exception occurred in trying to put ", e);
- }
- return super.remove(arg0, arg1);
- }
+ assertThat(
+ "Expected the counter to be 2 since it should not have been deleted but it is not so ",
+ map.get("key1"), is(2L));
}
- static List list1;
-
/**
* This test tests remove operation is causing the insertion of sequence ID for existing
* ThreadIdentifier object and concurrently the QRM thread is iterating over the Map to form the
@@ -864,80 +633,86 @@ public class HARegionQueueJUnitTest {
* It is then verified to see that all the sequence should be greater than x
*/
@Test
- public void testConcurrentDispatcherAndRemovalForSameRegionSameThreadId() {
- try {
- final long numberOfIterations = 1000;
- final HARegionQueue hrq = createHARegionQueue("testConcurrentDispatcherAndRemoval");
- HARegionQueue.stopQRMThread();
- final ThreadIdentifier[] ids = new ThreadIdentifier[(int) numberOfIterations];
- for (int i = 0; i < numberOfIterations; i++) {
- ids[i] = new ThreadIdentifier(new byte[] {1}, i);
- hrq.addDispatchedMessage(ids[i], i);
- }
- Thread thread1 = new Thread() {
- public void run() {
- try {
- Thread.sleep(600);
- } catch (InterruptedException e) {
- fail("interrupted");
- }
- list1 = HARegionQueue.createMessageListForTesting();
- };
- };
- Thread thread2 = new Thread() {
- public void run() {
- try {
- Thread.sleep(480);
- } catch (InterruptedException e) {
- fail("interrupted");
- }
- for (int i = 0; i < numberOfIterations; i++) {
- hrq.addDispatchedMessage(ids[i], i + numberOfIterations);
- }
- };
- };
- thread1.start();
- thread2.start();
- ThreadUtils.join(thread1, 30 * 1000);
- ThreadUtils.join(thread2, 30 * 1000);
- List list2 = HARegionQueue.createMessageListForTesting();
- Iterator iterator = list1.iterator();
- boolean doOnce = false;
- EventID id = null;
- Map map = new HashMap();
- while (iterator.hasNext()) {
- if (!doOnce) {
- iterator.next();
- iterator.next();
- doOnce = true;
- } else {
- id = (EventID) iterator.next();
- map.put(new Long(id.getThreadID()), new Long(id.getSequenceID()));
+ public void testConcurrentDispatcherAndRemovalForSameRegionSameThreadId() throws Exception {
+ long numberOfIterations = 1000;
+ HARegionQueue hrq = createHARegionQueue(this.testName.getMethodName());
+ HARegionQueue.stopQRMThread();
+ ThreadIdentifier[] ids = new ThreadIdentifier[(int) numberOfIterations];
+
+ for (int i = 0; i < numberOfIterations; i++) {
+ ids[i] = new ThreadIdentifier(new byte[] {1}, i);
+ hrq.addDispatchedMessage(ids[i], i);
+ }
+
+ Thread thread1 = new Thread() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(600);
+ } catch (InterruptedException e) {
+ errorCollector.addError(e);
}
+ list1 = HARegionQueue.createMessageListForTesting();
}
- iterator = list2.iterator();
- doOnce = false;
- id = null;
- while (iterator.hasNext()) {
- if (!doOnce) {
- iterator.next();
- iterator.next();
- doOnce = true;
- } else {
- id = (EventID) iterator.next();
- map.put(new Long(id.getThreadID()), new Long(id.getSequenceID()));
+ };
+
+ Thread thread2 = new Thread() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(480);
+ } catch (InterruptedException e) {
+ errorCollector.addError(e);
+ }
+ for (int i = 0; i < numberOfIterations; i++) {
+ hrq.addDispatchedMessage(ids[i], i + numberOfIterations);
}
}
- iterator = map.values().iterator();
- Long max = new Long(numberOfIterations);
- Long next;
- while (iterator.hasNext()) {
- next = ((Long) iterator.next());
- assertTrue(" Expected all the sequence ID's to be greater than " + max
- + " but it is not so. Got sequence id " + next, next.compareTo(max) >= 0);
+ };
+
+ thread1.start();
+ thread2.start();
+ ThreadUtils.join(thread1, 30 * 1000);
+ ThreadUtils.join(thread2, 30 * 1000);
+ List list2 = HARegionQueue.createMessageListForTesting();
+ Iterator iterator = list1.iterator();
+ boolean doOnce = false;
+ EventID id;
+ Map map = new HashMap();
+
+ while (iterator.hasNext()) {
+ if (!doOnce) {
+ iterator.next();
+ iterator.next();
+ doOnce = true;
+ } else {
+ id = (EventID) iterator.next();
+ map.put(new Long(id.getThreadID()), id.getSequenceID());
+ }
+ }
+
+ iterator = list2.iterator();
+ doOnce = false;
+
+ while (iterator.hasNext()) {
+ if (!doOnce) {
+ iterator.next();
+ iterator.next();
+ doOnce = true;
+ } else {
+ id = (EventID) iterator.next();
+ map.put(id.getThreadID(), id.getSequenceID());
}
- } catch (Exception e) {
- throw new AssertionError("Test failed due to : ", e);
+ }
+
+ iterator = map.values().iterator();
+ Long max = numberOfIterations;
+ while (iterator.hasNext()) {
+ Long next = (Long) iterator.next();
+ assertThat(
+ " Expected all the sequence ID's to be greater than " + max
+ + " but it is not so. Got sequence id " + next,
+ next.compareTo(max), greaterThanOrEqualTo(0));
}
}
@@ -958,77 +733,81 @@ public class HARegionQueueJUnitTest {
* It is then verified to see that the map size should be 2x
*/
@Test
- public void testConcurrentDispatcherAndRemovalForSameRegionDifferentThreadId() {
- try {
- final long numberOfIterations = 1000;
- final HARegionQueue hrq = createHARegionQueue("testConcurrentDispatcherAndRemoval");
- HARegionQueue.stopQRMThread();
- final ThreadIdentifier[] ids = new ThreadIdentifier[(int) numberOfIterations];
- for (int i = 0; i < numberOfIterations; i++) {
- ids[i] = new ThreadIdentifier(new byte[] {1}, i);
- hrq.addDispatchedMessage(ids[i], i);
- }
- Thread thread1 = new Thread() {
- public void run() {
- try {
- Thread.sleep(600);
- } catch (InterruptedException e) {
- fail("interrupted");
- }
- list1 = HARegionQueue.createMessageListForTesting();
- };
- };
- Thread thread2 = new Thread() {
- public void run() {
- try {
- Thread.sleep(480);
- } catch (InterruptedException e) {
- fail("interrupted");
- }
- for (int i = 0; i < numberOfIterations; i++) {
- ids[i] = new ThreadIdentifier(new byte[] {1}, i + numberOfIterations);
- hrq.addDispatchedMessage(ids[i], i + numberOfIterations);
- }
- };
- };
- thread1.start();
- thread2.start();
- ThreadUtils.join(thread1, 30 * 1000);
- ThreadUtils.join(thread2, 30 * 1000);
- List list2 = HARegionQueue.createMessageListForTesting();
- Iterator iterator = list1.iterator();
- boolean doOnce = false;
- EventID id = null;
- Map map = new HashMap();
- while (iterator.hasNext()) {
- if (!doOnce) {
- iterator.next();
- iterator.next();
- doOnce = true;
- } else {
- id = (EventID) iterator.next();
- map.put(new Long(id.getThreadID()), new Long(id.getSequenceID()));
+ public void testConcurrentDispatcherAndRemovalForSameRegionDifferentThreadId() throws Exception {
+ int numberOfIterations = 1000;
+ HARegionQueue hrq = createHARegionQueue(this.testName.getMethodName());
+ HARegionQueue.stopQRMThread();
+ ThreadIdentifier[] ids = new ThreadIdentifier[(int) numberOfIterations];
+
+ for (int i = 0; i < numberOfIterations; i++) {
+ ids[i] = new ThreadIdentifier(new byte[] {1}, i);
+ hrq.addDispatchedMessage(ids[i], i);
+ }
+
+ Thread thread1 = new Thread() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(600);
+ } catch (InterruptedException e) {
+ errorCollector.addError(e);
}
+ list1 = HARegionQueue.createMessageListForTesting();
}
- iterator = list2.iterator();
- doOnce = false;
- id = null;
- while (iterator.hasNext()) {
- if (!doOnce) {
- iterator.next();
- iterator.next();
- doOnce = true;
- } else {
- id = (EventID) iterator.next();
- map.put(new Long(id.getThreadID()), new Long(id.getSequenceID()));
+ };
+
+ Thread thread2 = new Thread() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(480);
+ } catch (InterruptedException e) {
+ errorCollector.addError(e);
+ }
+ for (int i = 0; i < numberOfIterations; i++) {
+ ids[i] = new ThreadIdentifier(new byte[] {1}, i + numberOfIterations);
+ hrq.addDispatchedMessage(ids[i], i + numberOfIterations);
}
}
- assertTrue(
- " Expected the map size to be " + (2 * numberOfIterations) + " but it is " + map.size(),
- map.size() == (2 * numberOfIterations));
- } catch (Exception e) {
- throw new AssertionError("Test failed due to an unexpected exception : ", e);
+ };
+
+ thread1.start();
+ thread2.start();
+ ThreadUtils.join(thread1, 30 * 1000);
+ ThreadUtils.join(thread2, 30 * 1000);
+ List list2 = HARegionQueue.createMessageListForTesting();
+ Iterator iterator = list1.iterator();
+ boolean doOnce = false;
+ EventID id;
+ Map map = new HashMap();
+
+ while (iterator.hasNext()) {
+ if (!doOnce) {
+ iterator.next();
+ iterator.next();
+ doOnce = true;
+ } else {
+ id = (EventID) iterator.next();
+ map.put(id.getThreadID(), id.getSequenceID());
+ }
+ }
+
+ iterator = list2.iterator();
+ doOnce = false;
+
+ while (iterator.hasNext()) {
+ if (!doOnce) {
+ iterator.next();
+ iterator.next();
+ doOnce = true;
+ } else {
+ id = (EventID) iterator.next();
+ map.put(id.getThreadID(), id.getSequenceID());
+ }
}
+ assertThat(
+ " Expected the map size to be " + 2 * numberOfIterations + " but it is " + map.size(),
+ map.size(), is(2 * numberOfIterations));
}
/**
@@ -1050,101 +829,96 @@ public class HARegionQueueJUnitTest {
* It is then verified to see that a total of x entries are present in the map
*/
@Test
- public void testConcurrentDispatcherAndRemovalForMultipleRegionsSameThreadId() {
- try {
- final long numberOfIterations = 10000;
- final HARegionQueue hrq1 = createHARegionQueue("testConcurrentDispatcherAndRemoval1");
-
- final HARegionQueue hrq2 = createHARegionQueue("testConcurrentDispatcherAndRemoval2");
+ public void testConcurrentDispatcherAndRemovalForMultipleRegionsSameThreadId() throws Exception {
+ int numberOfIterations = 10000;
+ HARegionQueue hrq1 = createHARegionQueue(this.testName.getMethodName() + "-1");
+ HARegionQueue hrq2 = createHARegionQueue(this.testName.getMethodName() + "-2");
+ HARegionQueue hrq3 = createHARegionQueue(this.testName.getMethodName() + "-3");
+ HARegionQueue hrq4 = createHARegionQueue(this.testName.getMethodName() + "-4");
+ HARegionQueue hrq5 = createHARegionQueue(this.testName.getMethodName() + "-5");
- final HARegionQueue hrq3 = createHARegionQueue("testConcurrentDispatcherAndRemoval3");
+ HARegionQueue.stopQRMThread();
- final HARegionQueue hrq4 = createHARegionQueue("testConcurrentDispatcherAndRemoval4");
+ ThreadIdentifier[] ids = new ThreadIdentifier[(int) numberOfIterations];
- final HARegionQueue hrq5 = createHARegionQueue("testConcurrentDispatcherAndRemoval5");
+ for (int i = 0; i < numberOfIterations; i++) {
+ ids[i] = new ThreadIdentifier(new byte[] {1}, i);
+ hrq1.addDispatchedMessage(ids[i], i);
+ hrq2.addDispatchedMessage(ids[i], i);
- HARegionQueue.stopQRMThread();
- final ThreadIdentifier[] ids = new ThreadIdentifier[(int) numberOfIterations];
-
- for (int i = 0; i < numberOfIterations; i++) {
- ids[i] = new ThreadIdentifier(new byte[] {1}, i);
- hrq1.addDispatchedMessage(ids[i], i);
- hrq2.addDispatchedMessage(ids[i], i);
+ }
+ Thread thread1 = new Thread() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(600);
+ } catch (InterruptedException e) {
+ errorCollector.addError(e);
+ }
+ list1 = HARegionQueue.createMessageListForTesting();
}
+ };
- Thread thread1 = new Thread() {
- public void run() {
- try {
- Thread.sleep(600);
- } catch (InterruptedException e) {
- fail("interrupted");
- }
- list1 = HARegionQueue.createMessageListForTesting();
- };
- };
- Thread thread2 = new Thread() {
- public void run() {
- try {
- Thread.sleep(480);
- } catch (InterruptedException e) {
- fail("interrupted");
- }
- for (int i = 0; i < numberOfIterations; i++) {
- hrq3.addDispatchedMessage(ids[i], i);
- hrq4.addDispatchedMessage(ids[i], i);
- hrq5.addDispatchedMessage(ids[i], i);
- }
- };
- };
- thread1.start();
- thread2.start();
- ThreadUtils.join(thread1, 30 * 1000);
- ThreadUtils.join(thread2, 30 * 1000);
- List list2 = HARegionQueue.createMessageListForTesting();
- Iterator iterator = list1.iterator();
- boolean doOnce = true;
- EventID id = null;
- Map map = new HashMap();
- while (iterator.hasNext()) {
- if (!doOnce) {
- iterator.next(); // read the total message size
- doOnce = true;
- } else {
- iterator.next();// region name;
- int size = ((Integer) iterator.next()).intValue();
- for (int i = 0; i < size; i++) {
- id = (EventID) iterator.next();
- map.put(new ThreadIdentifier(id.getMembershipID(), id.getThreadID()),
- new Long(id.getSequenceID()));
- }
+ Thread thread2 = new Thread() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(480);
+ } catch (InterruptedException e) {
+ errorCollector.addError(e);
+ }
+ for (int i = 0; i < numberOfIterations; i++) {
+ hrq3.addDispatchedMessage(ids[i], i);
+ hrq4.addDispatchedMessage(ids[i], i);
+ hrq5.addDispatchedMessage(ids[i], i);
}
}
-
- iterator = list2.iterator();
- doOnce = true;
- id = null;
- while (iterator.hasNext()) {
- if (!doOnce) {
- iterator.next(); // read the total message size
- doOnce = true;
- } else {
- iterator.next();// region name;
- int size = ((Integer) iterator.next()).intValue();
- for (int i = 0; i < size; i++) {
- id = (EventID) iterator.next();
- map.put(new ThreadIdentifier(id.getMembershipID(), id.getThreadID()),
- new Long(id.getSequenceID()));
- }
+ };
+
+ thread1.start();
+ thread2.start();
+ ThreadUtils.join(thread1, 30 * 1000);
+ ThreadUtils.join(thread2, 30 * 1000);
+ List list2 = HARegionQueue.createMessageListForTesting();
+ Iterator iterator = list1.iterator();
+ boolean doOnce = true;
+ EventID id;
+ Map map = new HashMap();
+
+ while (iterator.hasNext()) {
+ if (!doOnce) {
+ iterator.next(); // read the total message size
+ doOnce = true;
+ } else {
+ iterator.next();// region name;
+ int size = (Integer) iterator.next();
+ for (int i = 0; i < size; i++) {
+ id = (EventID) iterator.next();
+ map.put(new ThreadIdentifier(id.getMembershipID(), id.getThreadID()), id.getSequenceID());
}
}
- assertTrue(
- " Expected the map size to be " + (numberOfIterations) + " but it is " + map.size(),
- map.size() == (numberOfIterations));
+ }
- } catch (Exception e) {
- throw new AssertionError("Test failed due to : ", e);
+ iterator = list2.iterator();
+ doOnce = true;
+
+ while (iterator.hasNext()) {
+ if (!doOnce) {
+ iterator.next(); // read the total message size
+ doOnce = true;
+ } else {
+ iterator.next();// region name;
+ int size = (Integer) iterator.next();
+ for (int i = 0; i < size; i++) {
+ id = (EventID) iterator.next();
+ map.put(new ThreadIdentifier(id.getMembershipID(), id.getThreadID()), id.getSequenceID());
+ }
+ }
}
+
+ assertThat(" Expected the map size to be " + numberOfIterations + " but it is " + map.size(),
+ map.size(), is(numberOfIterations));
}
/**
@@ -1168,203 +942,179 @@ public class HARegionQueueJUnitTest {
* It is then verified to see that the map size should be 2x * number of regions
*/
@Test
- public void testConcurrentDispatcherAndRemovalForMultipleRegionsDifferentThreadId() {
- try {
- final long numberOfIterations = 1000;
- final HARegionQueue hrq1 =
-
- createHARegionQueue("testConcurrentDispatcherAndRemoval1");
-
- final HARegionQueue hrq2 =
-
- createHARegionQueue("testConcurrentDispatcherAndRemoval2");
- final HARegionQueue hrq3 =
-
- createHARegionQueue("testConcurrentDispatcherAndRemoval3");
- final HARegionQueue hrq4 =
-
- createHARegionQueue("testConcurrentDispatcherAndRemoval4");
- final HARegionQueue hrq5 =
-
- createHARegionQueue("testConcurrentDispatcherAndRemoval5");
-
- HARegionQueue.stopQRMThread();
-
- final ThreadIdentifier[] ids1 = new ThreadIdentifier[(int) numberOfIterations];
- final ThreadIdentifier[] ids2 = new ThreadIdentifier[(int) numberOfIterations];
- final ThreadIdentifier[] ids3 = new ThreadIdentifier[(int) numberOfIterations];
- final ThreadIdentifier[] ids4 = new ThreadIdentifier[(int) numberOfIterations];
- final ThreadIdentifier[] ids5 = new ThreadIdentifier[(int) numberOfIterations];
+ public void testConcurrentDispatcherAndRemovalForMultipleRegionsDifferentThreadId()
+ throws Exception {
+ int numberOfIterations = 1000;
+ HARegionQueue hrq1 = createHARegionQueue(this.testName.getMethodName() + "-1");
+ HARegionQueue hrq2 = createHARegionQueue(this.testName.getMethodName() + "-2");
+ HARegionQueue hrq3 = createHARegionQueue(this.testName.getMethodName() + "-3");
+ HARegionQueue hrq4 = createHARegionQueue(this.testName.getMethodName() + "-4");
+ HARegionQueue hrq5 = createHARegionQueue(this.testName.getMethodName() + "-5");
+
+ HARegionQueue.stopQRMThread();
+
+ ThreadIdentifier[] ids1 = new ThreadIdentifier[(int) numberOfIterations];
+ ThreadIdentifier[] ids2 = new ThreadIdentifier[(int) numberOfIterations];
+ ThreadIdentifier[] ids3 = new ThreadIdentifier[(int) numberOfIterations];
+ ThreadIdentifier[] ids4 = new ThreadIdentifier[(int) numberOfIterations];
+ ThreadIdentifier[] ids5 = new ThreadIdentifier[(int) numberOfIterations];
+
+ for (int i = 0; i < numberOfIterations; i++) {
+ ids1[i] = new ThreadIdentifier(new byte[] {1}, i);
+ ids2[i] = new ThreadIdentifier(new byte[] {2}, i);
+ ids3[i] = new ThreadIdentifier(new byte[] {3}, i);
+ ids4[i] = new ThreadIdentifier(new byte[] {4}, i);
+ ids5[i] = new ThreadIdentifier(new byte[] {5}, i);
+ hrq1.addDispatchedMessage(ids1[i], i);
+ hrq2.addDispatchedMessage(ids2[i], i);
+ hrq3.addDispatchedMessage(ids3[i], i);
+ hrq4.addDispatchedMessage(ids4[i], i);
+ hrq5.addDispatchedMessage(ids5[i], i);
+ }
- for (int i = 0; i < numberOfIterations; i++) {
- ids1[i] = new ThreadIdentifier(new byte[] {1}, i);
- ids2[i] = new ThreadIdentifier(new byte[] {2}, i);
- ids3[i] = new ThreadIdentifier(new byte[] {3}, i);
- ids4[i] = new ThreadIdentifier(new byte[] {4}, i);
- ids5[i] = new ThreadIdentifier(new byte[] {5}, i);
- hrq1.addDispatchedMessage(ids1[i], i);
- hrq2.addDispatchedMessage(ids2[i], i);
- hrq3.addDispatchedMessage(ids3[i], i);
- hrq4.addDispatchedMessage(ids4[i], i);
- hrq5.addDispatchedMessage(ids5[i], i);
+ Thread thread1 = new Thread() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(600);
+ } catch (InterruptedException e) {
+ errorCollector.addError(e);
+ }
+ list1 = HARegionQueue.createMessageListForTesting();
}
+ };
- Thread thread1 = new Thread() {
- public void run() {
- try {
- Thread.sleep(600);
- } catch (InterruptedException e) {
- fail("interrupted");
- }
- list1 = HARegionQueue.createMessageListForTesting();
- };
- };
- Thread thread2 = new Thread() {
- public void run() {
- try {
- Thread.sleep(480);
- } catch (InterruptedException e) {
- fail("Interrupted");
- }
- for (int i = 0; i < numberOfIterations; i++) {
- ids1[i] = new ThreadIdentifier(new byte[] {1}, i + numberOfIterations);
- ids2[i] = new ThreadIdentifier(new byte[] {2}, i + numberOfIterations);
- ids3[i] = new ThreadIdentifier(new byte[] {3}, i + numberOfIterations);
- ids4[i] = new ThreadIdentifier(new byte[] {4}, i + numberOfIterations);
- ids5[i] = new ThreadIdentifier(new byte[] {5}, i + numberOfIterations);
-
- hrq1.addDispatchedMessage(ids1[i], i + numberOfIterations);
- hrq2.addDispatchedMessage(ids2[i], i + numberOfIterations);
- hrq3.addDispatchedMessage(ids3[i], i + numberOfIterations);
- hrq4.addDispatchedMessage(ids4[i], i + numberOfIterations);
- hrq5.addDispatchedMessage(ids5[i], i + numberOfIterations);
- }
- };
- };
- thread1.start();
- thread2.start();
- ThreadUtils.join(thread1, 30 * 1000);
- ThreadUtils.join(thread2, 30 * 1000);
- List list2 = HARegionQueue.createMessageListForTesting();
- Iterator iterator = list1.iterator();
- boolean doOnce = true;
- EventID id = null;
- Map map = new HashMap();
- while (iterator.hasNext()) {
- if (!doOnce) {
- iterator.next(); // read the total message size
- doOnce = true;
- } else {
- iterator.next();// region name;
- int size = ((Integer) iterator.next()).intValue();
- System.out.println(" size of list 1 iteration x " + size);
- for (int i = 0; i < size; i++) {
-
- id = (EventID) iterator.next();
- map.put(new ThreadIdentifier(id.getMembershipID(), id.getThreadID()),
- new Long(id.getSequenceID()));
- }
+ Thread thread2 = new Thread() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(480);
+ } catch (InterruptedException e) {
+ errorCollector.addError(e);
+ }
+ for (int i = 0; i < numberOfIterations; i++) {
+ ids1[i] = new ThreadIdentifier(new byte[] {1}, i + numberOfIterations);
+ ids2[i] = new ThreadIdentifier(new byte[] {2}, i + numberOfIterations);
+ ids3[i] = new ThreadIdentifier(new byte[] {3}, i + numberOfIterations);
+ ids4[i] = new ThreadIdentifier(new byte[] {4}, i + numberOfIterations);
+ ids5[i] = new ThreadIdentifier(new byte[] {5}, i + numberOfIterations);
+
+ hrq1.addDispatchedMessage(ids1[i], i + numberOfIterations);
+ hrq2.addDispatchedMessage(ids2[i], i + numberOfIterations);
+ hrq3.addDispatchedMessage(ids3[i], i + numberOfIterations);
+ hrq4.addDispatchedMessage(ids4[i], i + numberOfIterations);
+ hrq5.addDispatchedMessage(ids5[i], i + numberOfIterations);
}
}
-
- iterator = list2.iterator();
- doOnce = true;
- id = null;
- while (iterator.hasNext()) {
- if (!doOnce) {
- iterator.next(); // read the total message size
- doOnce = true;
- } else {
- iterator.next();// region name;
- int size = ((Integer) iterator.next()).intValue();
- System.out.println(" size of list 2 iteration x " + size);
- for (int i = 0; i < size; i++) {
- id = (EventID) iterator.next();
- map.put(new ThreadIdentifier(id.getMembershipID(), id.getThreadID()),
- new Long(id.getSequenceID()));
- }
+ };
+
+ thread1.start();
+ thread2.start();
+ ThreadUtils.join(thread1, 30 * 1000);
+ ThreadUtils.join(thread2, 30 * 1000);
+ List list2 = HARegionQueue.createMessageListForTesting();
+ Iterator iterator = list1.iterator();
+ boolean doOnce = true;
+ EventID id = null;
+ Map map = new HashMap();
+
+ while (iterator.hasNext()) {
+ if (!doOnce) {
+ iterator.next(); // read the total message size
+ doOnce = true;
+ } else {
+ iterator.next(); // region name;
+ int size = (Integer) iterator.next();
+ System.out.println(" size of list 1 iteration x " + size);
+ for (int i = 0; i < size; i++) {
+ id = (EventID) iterator.next();
+ map.put(new ThreadIdentifier(id.getMembershipID(), id.getThreadID()), id.getSequenceID());
}
}
+ }
- assertTrue(" Expected the map size to be " + (numberOfIterations * 2 * 5) + " but it is "
- + map.size(), map.size() == (numberOfIterations * 2 * 5));
+ iterator = list2.iterator();
+ doOnce = true;
- } catch (Exception e) {
- throw new AssertionError("Test failed due to : ", e);
+ while (iterator.hasNext()) {
+ if (!doOnce) {
+ iterator.next(); // read the total message size
+ doOnce = true;
+ } else {
+ iterator.next(); // region name;
+ int size = (Integer) iterator.next();
+ System.out.println(" size of list 2 iteration x " + size);
+ for (int i = 0; i < size; i++) {
+ id = (EventID) iterator.next();
+ map.put(new ThreadIdentifier(id.getMembershipID(), id.getThreadID()), id.getSequenceID());
+ }
+ }
}
+
+ assertThat(
+ " Expected the map size to be " + numberOfIterations * 2 * 5 + " but it is " + map.size(),
+ map.size(), is(numberOfIterations * 2 * 5));
}
/**
- * Concurrent Peek on Blokcing Queue waiting with for a Put . If concurrent take is also happening
+ * Concurrent Peek on Blocking Queue waiting with for a Put . If concurrent take is also happening
* such that the object is removed first then the peek should block & not return with null.
*/
@Test
- public void testBlockingQueueForConcurrentPeekAndTake() {
- exceptionInThread = false;
- testFailed = false;
- try {
- final TestBlockingHARegionQueue bQ =
- new TestBlockingHARegionQueue("testBlockQueueForConcurrentPeekAndTake", cache);
- Thread[] threads = new Thread[3];
- for (int i = 0; i < 3; i++) {
- threads[i] = new Thread() {
- public void run() {
- try {
- long startTime = System.currentTimeMillis();
- Object obj = bQ.peek();
- if (obj == null) {
- testFailed = true;
- message.append(
- " Failed : failed since object was null and was not expected to be null \n");
- }
- long totalTime = System.currentTimeMillis() - startTime;
+ public void testBlockingQueueForConcurrentPeekAndTake() throws Exception {
+ TestBlockingHARegionQueue regionQueue =
+ new TestBlockingHARegionQueue("testBlockQueueForConcurrentPeekAndTake", this.cache);
+ Thread[] threads = new Thread[3];
+
+ for (int i = 0; i < 3; i++) {
+ threads[i] = new Thread() {
+ @Override
+ public void run() {
+ try {
+ long startTime = System.currentTimeMillis();
+ Object obj = regionQueue.peek();
+ if (obj == null) {
+ errorCollector.addError(new AssertionError(
+ "Failed : failed since object was null and was not expected to be null"));
+ }
+ long totalTime = System.currentTimeMillis() - startTime;
- if (totalTime < 4000) {
- testFailed = true;
- message
- .append(" Failed : Expected time to be greater than 4000 but it is not so ");
- }
- } catch (Exception e) {
- exceptionInThread = true;
- exception = e;
+ if (totalTime < 4000) {
+ errorCollector.addError(new AssertionError(
+ "Failed : Expected time to be greater than 4000 but it is not so"));
}
+ } catch (Exception e) {
+ errorCollector.addError(e);
}
- };
-
- }
-
- for (int k = 0; k < 3; k++) {
- threads[k].start();
- }
- Thread.sleep(4000);
-
- EventID id = new EventID(new byte[] {1}, 1, 1);
- EventID id1 = new EventID(new byte[] {1}, 1, 2);
+ }
+ };
+ }
- bQ.takeFirst = true;
- bQ.put(new ConflatableObject("key", "value", id, true, "testing"));
+ for (int k = 0; k < 3; k++) {
+ threads[k].start();
+ }
- Thread.sleep(2000);
+ Thread.sleep(4000);
- bQ.put(new ConflatableObject("key1", "value1", id1, true, "testing"));
+ EventID id = new EventID(new byte[] {1}, 1, 1);
+ EventID id1 = new EventID(new byte[] {1}, 1, 2);
- long startTime = System.currentTimeMillis();
- for (int k = 0; k < 3; k++) {
- ThreadUtils.join(threads[k], 180 * 1000);
- }
+ regionQueue.takeFirst = true;
+ regionQueue.put(new ConflatableObject("key", "value", id, true, this.testName.getMethodName()));
- long totalTime = System.currentTimeMillis() - startTime;
+ Thread.sleep(2000);
- if (totalTime >= 180000) {
- fail(" Test taken too long ");
- }
+ regionQueue
+ .put(new ConflatableObject("key1", "value1", id1, true, this.testName.getMethodName()));
- if (testFailed) {
- fail(" test failed due to " + message);
- }
+ long startTime = System.currentTimeMillis();
+ for (int k = 0; k < 3; k++) {
+ ThreadUtils.join(threads[k], 180 * 1000);
+ }
- } catch (Exception e) {
- throw new AssertionError(" Test failed due to ", e);
+ long totalTime = System.currentTimeMillis() - startTime;
+ if (totalTime >= 180000) {
+ fail(" Test taken too long ");
}
}
@@ -1373,71 +1123,60 @@ public class HARegionQueueJUnitTest {
* QRM thread , the peek should block correctly.
*/
@Test
- public void testBlockingQueueForTakeWhenPeekInProgress() {
- exceptionInThread = false;
- testFailed = false;
- try {
- final TestBlockingHARegionQueue bQ =
- new TestBlockingHARegionQueue("testBlockQueueForTakeWhenPeekInProgress", cache);
- Thread[] threads = new Thread[3];
- for (int i = 0; i < 3; i++) {
- threads[i] = new Thread() {
- public void run() {
- try {
- long startTime = System.currentTimeMillis();
- Object obj = bQ.peek();
- if (obj == null) {
- testFailed = true;
- message.append(
- " Failed : failed since object was null and was not expected to be null \n");
- }
- long totalTime = System.currentTimeMillis() - startTime;
+ public void testBlockingQueueForTakeWhenPeekInProgress() throws Exception {
+ TestBlockingHARegionQueue regionQueue =
+ new TestBlockingHARegionQueue("testBlockQueueForTakeWhenPeekInProgress", this.cache);
+ Thread[] threads = new Thread[3];
+
+ for (int i = 0; i < 3; i++) {
+ threads[i] = new Thread() {
+ @Override
+ public void run() {
+ try {
+ long startTime = System.currentTimeMillis();
+ Object obj = regionQueue.peek();
+ if (obj == null) {
+ errorCollector.addError(new AssertionError(
+ "Failed : failed since object was null and was not expected to be null"));
+ }
+ long totalTime = System.currentTimeMillis() - startTime;
- if (totalTime < 4000) {
- testFailed = true;
- message
- .append(" Failed : Expected time to be greater than 4000 but it is not so ");
- }
- } catch (Exception e) {
- exceptionInThread = true;
- exception = e;
+ if (totalTime < 4000) {
+ errorCollector.addError(new AssertionError(
+ "Failed : Expected time to be greater than 4000 but it is not so"));
}
+ } catch (Exception e) {
+ errorCollector.addError(e);
}
- };
- }
-
- for (int k = 0; k < 3; k++) {
- threads[k].start();
- }
- Thread.sleep(4000);
-
- EventID id = new EventID(new byte[] {1}, 1, 1);
- EventID id1 = new EventID(new byte[] {1}, 1, 2);
+ }
+ };
+ }
- bQ.takeWhenPeekInProgress = true;
- bQ.put(new ConflatableObject("key", "value", id, true, "testing"));
+ for (int k = 0; k < 3; k++) {
+ threads[k].start();
+ }
- Thread.sleep(2000);
+ Thread.sleep(4000);
- bQ.put(new ConflatableObject("key1", "value1", id1, true, "testing"));
+ EventID id = new EventID(new byte[] {1}, 1, 1);
+ EventID id1 = new EventID(new byte[] {1}, 1, 2);
- long startTime = System.currentTimeMillis();
- for (int k = 0; k < 3; k++) {
- ThreadUtils.join(threads[k], 60 * 1000);
- }
+ regionQueue.takeWhenPeekInProgress = true;
+ regionQueue.put(new ConflatableObject("key", "value", id, true, this.testName.getMethodName()));
- long totalTime = System.currentTimeMillis() - startTime;
+ Thread.sleep(2000);
- if (totalTime >= 60000) {
- fail(" Test taken too long ");
- }
+ regionQueue
+ .put(new ConflatableObject("key1", "value1", id1, true, this.testName.getMethodName()));
- if (testFailed) {
- fail(" test failed due to " + message);
- }
+ long startTime = System.currentTimeMillis();
+ for (int k = 0; k < 3; k++) {
+ ThreadUtils.join(threads[k], 60 * 1000);
+ }
- } catch (Exception e) {
- throw new AssertionError(" Test failed due to ", e);
+ long totalTime = System.currentTimeMillis() - startTime;
+ if (totalTime >= 60000) {
+ fail(" Test taken too long ");
}
}
@@ -1451,138 +1190,88 @@ public class HARegionQueueJUnitTest {
* violation. This test will validate that behaviour
*/
@Test
- public void testConcurrentEventExpiryAndTake() {
- try {
- HARegionQueueAttributes haa = new HARegionQueueAttributes();
- haa.setExpiryTime(3);
- final RegionQueue regionqueue =
- new HARegionQueue.TestOnlyHARegionQueue("testing", cache, haa) {
- CacheListener createCacheListenerForHARegion() {
+ public void testConcurrentEventExpiryAndTake() throws Exception {
+ AtomicBoolean complete = new AtomicBoolean(false);
+ AtomicBoolean expiryCalled = new AtomicBoolean(false);
+ AtomicBoolean allowExpiryToProceed = new AtomicBoolean(false);
- return new CacheListenerAdapter() {
+ HARegionQueueAttributes haa = new HARegionQueueAttributes();
+ haa.setExpiryTime(3);
- public void afterInvalidate(EntryEvent event) {
+ RegionQueue regionqueue =
+ new HARegionQueue.TestOnlyHARegionQueue(this.testName.getMethodName(), this.cache, haa) {
+ @Override
+ CacheListener createCacheListenerForHARegion() {
- if (event.getKey() instanceof Long) {
- synchronized (HARegionQueueJUnitTest.this) {
- expiryCalled = true;
- HARegionQueueJUnitTest.this.notify();
+ return new CacheListenerAdapter() {
- } ;
- Thread.yield();
+ @Override
+ public void afterInvalidate(EntryEvent event) {
- synchronized (HARegionQueueJUnitTest.this) {
- if (!allowExpiryToProceed) {
- try {
- HARegionQueueJUnitTest.this.wait();
- } catch (InterruptedException e1) {
- encounteredException = true;
- }
+ if (event.getKey() instanceof Long) {
+ synchronized (HARegionQueueJUnitTest.this) {
+ expiryCalled.set(true);
+ HARegionQueueJUnitTest.this.notifyAll();
+ }
+
+ Thread.yield();
+
+ synchronized (HARegionQueueJUnitTest.this) {
+ while (!allowExpiryToProceed.get()) {
+ try {
+ HARegionQueueJUnitTest.this.wait();
+ } catch (InterruptedException e) {
+ errorCollector.addError(e);
+ break;
}
}
- try {
- expireTheEventOrThreadIdentifier(event);
- } catch (CacheException e) {
- e.printStackTrace();
- encounteredException = true;
- } finally {
- synchronized (HARegionQueueJUnitTest.this) {
- complete = true;
- HARegionQueueJUnitTest.this.notify();
- }
+ }
+
+ try {
+ expireTheEventOrThreadIdentifier(event);
+ } catch (CacheException e) {
+ errorCollector.addError(e);
+ } finally {
+ synchronized (HARegionQueueJUnitTest.this) {
+ complete.set(true);
+ HARegionQueueJUnitTest.this.notifyAll();
}
}
}
- };
- }
- };
- EventID ev1 = new EventID(new byte[] {1}, 1, 1);
+ }
+ };
+ }
+ };
- Conflatable cf1 = new ConflatableObject("key", "value", ev1, true, "testing");
+ EventID ev1 = new EventID(new byte[] {1}, 1, 1);
+ Conflatable cf1 =
+ new ConflatableObject("key", "value", ev1, true, this.testName.getMethodName());
+ regionqueue.put(cf1);
- regionqueue.put(cf1);
- synchronized (this) {
- if (!expiryCalled) {
- this.wait();
- }
- }
- try {
- Object o = regionqueue.take();
- assertNull(o);
- } catch (Exception e) {
- throw new AssertionError("Test failed due to exception ", e);
- } finally {
- synchronized (this) {
- this.allowExpiryToProceed = true;
- this.notify();
- }
+ synchronized (this) {
+
<TRUNCATED>