You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/05/30 17:42:30 UTC
[38/43] geode git commit: Revert BlockingHARegionJUnitTest
Revert BlockingHARegionJUnitTest
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/91c13dab
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/91c13dab
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/91c13dab
Branch: refs/heads/feature/GEODE-2632-17
Commit: 91c13dabf3d5591d8e08cfeddf3b13feb7058503
Parents: 4f6a7a7
Author: Kirk Lund <kl...@apache.org>
Authored: Wed May 24 19:13:53 2017 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Tue May 30 10:21:11 2017 -0700
----------------------------------------------------------------------
.../cache/ha/BlockingHARegionJUnitTest.java | 494 ++++++++++---------
1 file changed, 270 insertions(+), 224 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/91c13dab/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionJUnitTest.java
index 1534192..d0f5793 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionJUnitTest.java
@@ -14,116 +14,76 @@
*/
package org.apache.geode.internal.cache.ha;
-import static java.util.concurrent.TimeUnit.*;
import static org.apache.geode.distributed.ConfigurationProperties.*;
-import static org.apache.geode.internal.cache.ha.HARegionQueue.*;
import static org.junit.Assert.*;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Properties;
-import org.awaitility.Awaitility;
-import org.awaitility.core.ConditionFactory;
-import org.junit.After;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
import org.junit.Before;
import org.junit.Ignore;
-import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.internal.cache.EventID;
-import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.test.dunit.ThreadUtils;
-import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
+import org.apache.geode.test.dunit.Wait;
+import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.junit.categories.IntegrationTest;
-import org.apache.geode.test.junit.rules.serializable.SerializableErrorCollector;
-/**
- * Integration tests for Blocking HARegionQueue.
- *
- * <p>
- * #40314: Filled up queue causes all publishers to block
- *
- * <p>
- * #37627: In case of out of order messages, (sequence Id violation), in spite of HARQ not full, the
- * capacity (putPermits) of the HARQ exhausted.
- */
@Category({IntegrationTest.class, ClientSubscriptionTest.class})
public class BlockingHARegionJUnitTest {
- public static final String REGION = "BlockingHARegionJUnitTest_Region";
- private static final long THREAD_TIMEOUT = 2 * 60 * 1000;
-
- private final Object numberForThreadsLock = new Object();
- private int numberForDoPuts;
- private int numberForDoTakes;
-
- volatile boolean stopThreads;
+ private static InternalCache cache = null;
- private InternalCache cache;
- private HARegionQueueAttributes queueAttributes;
- private List<Thread> threads;
- private ThreadGroup threadGroup;
-
- @Rule
- public SerializableErrorCollector errorCollector = new SerializableErrorCollector();
+ /** boolean to record an exception occurence in another thread **/
+ private static volatile boolean exceptionOccurred = false;
+ /** StringBuffer to store the exception **/
+ private static StringBuffer exceptionString = new StringBuffer();
+ /** boolen to quit the for loop **/
+ private static volatile boolean quitForLoop = false;
@Before
public void setUp() throws Exception {
- synchronized (this.numberForThreadsLock) {
- this.numberForDoPuts = 0;
- this.numberForDoTakes = 0;
- }
-
- this.stopThreads = false;
- this.threads = new ArrayList<>();
- this.threadGroup = new ThreadGroup(getClass().getSimpleName()) {
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- errorCollector.addError(e);
- }
- };
-
- this.queueAttributes = new HARegionQueueAttributes();
-
- Properties config = new Properties();
- config.setProperty(MCAST_PORT, "0");
-
- this.cache = (InternalCache) CacheFactory.create(DistributedSystem.connect(config));
- }
-
- @After
- public void tearDown() throws Exception {
- try {
- this.stopThreads = true;
- for (Thread thread : this.threads) {
- thread.interrupt();
- ThreadUtils.join(thread, THREAD_TIMEOUT);
- }
- } finally {
- if (this.cache != null) {
- this.cache.close();
- }
+ Properties props = new Properties();
+ props.setProperty(MCAST_PORT, "0");
+ if (cache != null) {
+ cache.close(); // fault tolerance
}
+ cache = (InternalCache) CacheFactory.create(DistributedSystem.connect(props));
}
/**
- * This test has a scenario where the HARegionQueue capacity is just 1. There will be two thread.
+ * This test has a scenario where the HAReqionQueue capacity is just 1. There will be two thread.
* One doing a 1000 puts and the other doing a 1000 takes. The validation for this test is that it
* should not encounter any exceptions
*/
@Test
public void testBoundedPuts() throws Exception {
- this.queueAttributes.setBlockingQueueCapacity(1);
- HARegionQueue hrq = getHARegionQueueInstance(REGION, this.cache, this.queueAttributes,
- BLOCKING_HA_QUEUE, false);
- hrq.setPrimary(true); // fix for 40314 - capacity constraint is checked for primary only
+ exceptionOccurred = false;
+ HARegionQueueAttributes harqa = new HARegionQueueAttributes();
+ harqa.setBlockingQueueCapacity(1);
+ HARegionQueue hrq = HARegionQueue.getHARegionQueueInstance("BlockingHARegionJUnitTest_Region",
+ cache, harqa, HARegionQueue.BLOCKING_HA_QUEUE, false);
+ hrq.setPrimary(true);// fix for 40314 - capacity constraint is checked for primary only.
+ Thread thread1 = new DoPuts(hrq, 1000);
+ Thread thread2 = new DoTake(hrq, 1000);
+
+ thread1.start();
+ thread2.start();
+
+ ThreadUtils.join(thread1, 30 * 1000);
+ ThreadUtils.join(thread2, 30 * 1000);
+
+ if (exceptionOccurred) {
+ fail(" Test failed due to " + exceptionString);
+ }
- startDoPuts(hrq, 1000);
- startDoTakes(hrq, 1000);
+ cache.close();
}
/**
@@ -136,23 +96,62 @@ public class BlockingHARegionJUnitTest {
*/
@Test
public void testPutBeingBlocked() throws Exception {
- this.queueAttributes.setBlockingQueueCapacity(1);
- HARegionQueue hrq = getHARegionQueueInstance(REGION, this.cache, this.queueAttributes,
- BLOCKING_HA_QUEUE, false);
- hrq.setPrimary(true); // fix for 40314 - capacity constraint is checked for primary only
+ exceptionOccurred = false;
+ quitForLoop = false;
+ HARegionQueueAttributes harqa = new HARegionQueueAttributes();
+ harqa.setBlockingQueueCapacity(1);
+ final HARegionQueue hrq = HARegionQueue.getHARegionQueueInstance(
+ "BlockingHARegionJUnitTest_Region", cache, harqa, HARegionQueue.BLOCKING_HA_QUEUE, false);
+ hrq.setPrimary(true);// fix for 40314 - capacity constraint is checked for primary only.
+ final Thread thread1 = new DoPuts(hrq, 2);
+ thread1.start();
+ WaitCriterion ev = new WaitCriterion() {
+ public boolean done() {
+ return hrq.region.size() == 2;
+ }
- Thread doPuts = startDoPuts(hrq, 2);
+ public String description() {
+ return null;
+ }
+ };
+ Wait.waitForCriterion(ev, 1000, 200, true);
+ assertTrue(thread1.isAlive()); // thread should still be alive (in wait state)
+
+ Thread thread2 = new DoTake(hrq, 1);
+ thread2.start(); // start take thread
+ ev = new WaitCriterion() {
+ public boolean done() {
+ return hrq.region.size() == 3;
+ }
- await().until(() -> assertTrue(hrq.region.size() == 2));
+ public String description() {
+ return null;
+ }
+ };
+ // sleep. take will proceed and so will sleeping put
+ Wait.waitForCriterion(ev, 3 * 1000, 200, true);
- // thread should still be alive (in wait state)
- assertTrue(doPuts.isAlive());
+ // thread should have died since put should have proceeded
+ ev = new WaitCriterion() {
+ public boolean done() {
+ return !thread1.isAlive();
+ }
- startDoTakes(hrq, 1);
+ public String description() {
+ return "thread1 still alive";
+ }
+ };
+ Wait.waitForCriterion(ev, 30 * 1000, 1000, true);
- await().until(() -> assertTrue(hrq.region.size() == 3));
+ ThreadUtils.join(thread1, 30 * 1000); // for completeness
+ ThreadUtils.join(thread2, 30 * 1000);
+ if (exceptionOccurred) {
+ fail(" Test failed due to " + exceptionString);
+ }
+ cache.close();
}
+
/**
* This test tests that the region capacity is never exceeded even in highly concurrent
* environments. The region capacity is set to 10000. Then 5 threads start doing put
@@ -162,26 +161,62 @@ public class BlockingHARegionJUnitTest {
*/
@Test
public void testConcurrentPutsNotExceedingLimit() throws Exception {
- this.queueAttributes.setBlockingQueueCapacity(10000);
- HARegionQueue hrq = getHARegionQueueInstance(REGION, this.cache, this.queueAttributes,
- BLOCKING_HA_QUEUE, false);
- hrq.setPrimary(true); // fix for 40314 - capacity constraint is checked for primary only
-
- Thread doPuts1 = startDoPuts(hrq, 20000, 1);
- Thread doPuts2 = startDoPuts(hrq, 20000, 2);
- Thread doPuts3 = startDoPuts(hrq, 20000, 3);
- Thread doPuts4 = startDoPuts(hrq, 20000, 4);
- Thread doPuts5 = startDoPuts(hrq, 20000, 5);
+ exceptionOccurred = false;
+ quitForLoop = false;
+ HARegionQueueAttributes harqa = new HARegionQueueAttributes();
+ harqa.setBlockingQueueCapacity(10000);
+ final HARegionQueue hrq = HARegionQueue.getHARegionQueueInstance(
+ "BlockingHARegionJUnitTest_Region", cache, harqa, HARegionQueue.BLOCKING_HA_QUEUE, false);
+ hrq.setPrimary(true);// fix for 40314 - capacity constraint is checked for primary only.
+ Thread thread1 = new DoPuts(hrq, 20000, 1);
+ Thread thread2 = new DoPuts(hrq, 20000, 2);
+ Thread thread3 = new DoPuts(hrq, 20000, 3);
+ Thread thread4 = new DoPuts(hrq, 20000, 4);
+ Thread thread5 = new DoPuts(hrq, 20000, 5);
+
+ thread1.start();
+ thread2.start();
+ thread3.start();
+ thread4.start();
+ thread5.start();
+
+ WaitCriterion ev = new WaitCriterion() {
+ public boolean done() {
+ return hrq.region.size() == 20000;
+ }
- await().until(() -> assertTrue(hrq.region.size() == 20000));
+ public String description() {
+ return null;
+ }
+ };
+ Wait.waitForCriterion(ev, 30 * 1000, 200, true);
- assertTrue(doPuts1.isAlive());
- assertTrue(doPuts2.isAlive());
- assertTrue(doPuts3.isAlive());
- assertTrue(doPuts4.isAlive());
- assertTrue(doPuts5.isAlive());
+ assertTrue(thread1.isAlive());
+ assertTrue(thread2.isAlive());
+ assertTrue(thread3.isAlive());
+ assertTrue(thread4.isAlive());
+ assertTrue(thread5.isAlive());
assertTrue(hrq.region.size() == 20000);
+
+ quitForLoop = true;
+ Thread.sleep(20000);
+
+ thread1.interrupt();
+ thread2.interrupt();
+ thread3.interrupt();
+ thread4.interrupt();
+ thread5.interrupt();
+
+ Thread.sleep(2000);
+
+ ThreadUtils.join(thread1, 5 * 60 * 1000);
+ ThreadUtils.join(thread2, 5 * 60 * 1000);
+ ThreadUtils.join(thread3, 5 * 60 * 1000);
+ ThreadUtils.join(thread4, 5 * 60 * 1000);
+ ThreadUtils.join(thread5, 5 * 60 * 1000);
+
+ cache.close();
}
/**
@@ -191,41 +226,84 @@ public class BlockingHARegionJUnitTest {
* state. the region size would be verified to be 20000 (10000 puts and 10000 DACE objects). then
* the threads are interrupted and made to quit the loop
*/
- @Ignore("Test is disabled until/if blocking queue capacity becomes a hard limit")
+ @Ignore("TODO: test is disabled")
@Test
public void testConcurrentPutsTakesNotExceedingLimit() throws Exception {
- this.queueAttributes.setBlockingQueueCapacity(10000);
- HARegionQueue hrq = getHARegionQueueInstance(REGION, this.cache, this.queueAttributes,
- BLOCKING_HA_QUEUE, false);
- hrq.setPrimary(true); // fix for 40314 - capacity constraint is checked for primary only
-
- Thread doPuts1 = startDoPuts(hrq, 40000, 1);
- Thread doPuts2 = startDoPuts(hrq, 40000, 2);
- Thread doPuts3 = startDoPuts(hrq, 40000, 3);
- Thread doPuts4 = startDoPuts(hrq, 40000, 4);
- Thread doPuts5 = startDoPuts(hrq, 40000, 5);
-
- Thread doTakes1 = startDoTakes(hrq, 5000);
- Thread doTakes2 = startDoTakes(hrq, 5000);
- Thread doTakes3 = startDoTakes(hrq, 5000);
- Thread doTakes4 = startDoTakes(hrq, 5000);
- Thread doTakes5 = startDoTakes(hrq, 5000);
-
- ThreadUtils.join(doTakes1, 30 * 1000);
- ThreadUtils.join(doTakes2, 30 * 1000);
- ThreadUtils.join(doTakes3, 30 * 1000);
- ThreadUtils.join(doTakes4, 30 * 1000);
- ThreadUtils.join(doTakes5, 30 * 1000);
-
- await().until(() -> assertTrue(hrq.region.size() == 20000));
-
- assertTrue(doPuts1.isAlive());
- assertTrue(doPuts2.isAlive());
- assertTrue(doPuts3.isAlive());
- assertTrue(doPuts4.isAlive());
- assertTrue(doPuts5.isAlive());
+ exceptionOccurred = false;
+ quitForLoop = false;
+ HARegionQueueAttributes harqa = new HARegionQueueAttributes();
+ harqa.setBlockingQueueCapacity(10000);
+ final HARegionQueue hrq = HARegionQueue.getHARegionQueueInstance(
+ "BlockingHARegionJUnitTest_Region", cache, harqa, HARegionQueue.BLOCKING_HA_QUEUE, false);
+ Thread thread1 = new DoPuts(hrq, 40000, 1);
+ Thread thread2 = new DoPuts(hrq, 40000, 2);
+ Thread thread3 = new DoPuts(hrq, 40000, 3);
+ Thread thread4 = new DoPuts(hrq, 40000, 4);
+ Thread thread5 = new DoPuts(hrq, 40000, 5);
+
+ Thread thread6 = new DoTake(hrq, 5000);
+ Thread thread7 = new DoTake(hrq, 5000);
+ Thread thread8 = new DoTake(hrq, 5000);
+ Thread thread9 = new DoTake(hrq, 5000);
+ Thread thread10 = new DoTake(hrq, 5000);
+
+ thread1.start();
+ thread2.start();
+ thread3.start();
+ thread4.start();
+ thread5.start();
+
+ thread6.start();
+ thread7.start();
+ thread8.start();
+ thread9.start();
+ thread10.start();
+
+ ThreadUtils.join(thread6, 30 * 1000);
+ ThreadUtils.join(thread7, 30 * 1000);
+ ThreadUtils.join(thread8, 30 * 1000);
+ ThreadUtils.join(thread9, 30 * 1000);
+ ThreadUtils.join(thread10, 30 * 1000);
+
+ WaitCriterion ev = new WaitCriterion() {
+ public boolean done() {
+ return hrq.region.size() == 20000;
+ }
+
+ public String description() {
+ return null;
+ }
+ };
+ Wait.waitForCriterion(ev, 30 * 1000, 200, true);
+
+ assertTrue(thread1.isAlive());
+ assertTrue(thread2.isAlive());
+ assertTrue(thread3.isAlive());
+ assertTrue(thread4.isAlive());
+ assertTrue(thread5.isAlive());
assertTrue(hrq.region.size() == 20000);
+
+ quitForLoop = true;
+
+ Thread.sleep(2000);
+
+ thread1.interrupt();
+ thread2.interrupt();
+ thread3.interrupt();
+ thread4.interrupt();
+ thread5.interrupt();
+
+ Thread.sleep(2000);
+
+
+ ThreadUtils.join(thread1, 30 * 1000);
+ ThreadUtils.join(thread2, 30 * 1000);
+ ThreadUtils.join(thread3, 30 * 1000);
+ ThreadUtils.join(thread4, 30 * 1000);
+ ThreadUtils.join(thread5, 30 * 1000);
+
+ cache.close();
}
/**
@@ -237,92 +315,62 @@ public class BlockingHARegionJUnitTest {
*/
@Test
public void testHARQMaxCapacity_Bug37627() throws Exception {
- this.queueAttributes.setBlockingQueueCapacity(1);
- this.queueAttributes.setExpiryTime(180);
- HARegionQueue hrq = getHARegionQueueInstance(REGION, this.cache, this.queueAttributes,
- BLOCKING_HA_QUEUE, false);
- hrq.setPrimary(true); // fix for 40314 - capacity constraint is checked for primary only
-
- EventID event1 = new EventID(new byte[] {1}, 1, 2); // violation
- EventID event2 = new EventID(new byte[] {1}, 1, 1); // ignored
- EventID event3 = new EventID(new byte[] {1}, 1, 3);
-
- newThread(new Runnable() {
- @Override
- public void run() {
- try {
- hrq.put(new ConflatableObject("key1", "value1", event1, false, "region1"));
- hrq.take();
- hrq.put(new ConflatableObject("key2", "value1", event2, false, "region1"));
- hrq.put(new ConflatableObject("key3", "value1", event3, false, "region1"));
- } catch (Exception e) {
- errorCollector.addError(e);
+ try {
+ exceptionOccurred = false;
+ quitForLoop = false;
+ HARegionQueueAttributes harqa = new HARegionQueueAttributes();
+ harqa.setBlockingQueueCapacity(1);
+ harqa.setExpiryTime(180);
+ final HARegionQueue hrq = HARegionQueue.getHARegionQueueInstance(
+ "BlockingHARegionJUnitTest_Region", cache, harqa, HARegionQueue.BLOCKING_HA_QUEUE, false);
+ hrq.setPrimary(true);// fix for 40314 - capacity constraint is checked for primary only.
+ final EventID id1 = new EventID(new byte[] {1}, 1, 2); // violation
+ final EventID ignore = new EventID(new byte[] {1}, 1, 1); //
+ final EventID id2 = new EventID(new byte[] {1}, 1, 3); //
+ Thread t1 = new Thread() {
+ public void run() {
+ try {
+ hrq.put(new ConflatableObject("key1", "value1", id1, false, "region1"));
+ hrq.take();
+ hrq.put(new ConflatableObject("key2", "value1", ignore, false, "region1"));
+ hrq.put(new ConflatableObject("key3", "value1", id2, false, "region1"));
+ } catch (Exception e) {
+ exceptionString.append("First Put in region queue failed");
+ exceptionOccurred = true;
+ }
}
+ };
+ t1.start();
+ ThreadUtils.join(t1, 20 * 1000);
+ if (exceptionOccurred) {
+ fail(" Test failed due to " + exceptionString);
+ }
+ } finally {
+ if (cache != null) {
+ cache.close();
}
- });
- }
-
- private Thread newThread(Runnable runnable) {
- Thread thread = new Thread(this.threadGroup, runnable);
- this.threads.add(thread);
- thread.start();
- return thread;
- }
-
- private Thread startDoPuts(HARegionQueue haRegionQueue, int count) {
- return startDoPuts(haRegionQueue, count, 0);
- }
-
- private Thread startDoPuts(HARegionQueue haRegionQueue, int count, int regionId) {
- Thread thread = new DoPuts(this.threadGroup, haRegionQueue, count, regionId);
- this.threads.add(thread);
- thread.start();
- return thread;
- }
-
- private Thread startDoTakes(HARegionQueue haRegionQueue, int count) {
- Thread thread = new DoTakes(this.threadGroup, haRegionQueue, count);
- this.threads.add(thread);
- thread.start();
- return thread;
- }
-
- private ConditionFactory await() {
- return Awaitility.await().atMost(2, MINUTES);
- }
-
- int nextDoPutsThreadNum() {
- synchronized (this.numberForThreadsLock) {
- return numberForDoPuts++;
- }
- }
-
- int nextDoTakesThreadNum() {
- synchronized (this.numberForThreadsLock) {
- return numberForDoTakes++;
}
}
/**
* class which does specified number of puts on the queue
*/
- private class DoPuts extends Thread {
+ private static class DoPuts extends Thread {
- private final HARegionQueue regionQueue;
+ HARegionQueue regionQueue = null;
+ final int numberOfPuts;
- private final int numberOfPuts;
+ DoPuts(HARegionQueue haRegionQueue, int numberOfPuts) {
+ this.regionQueue = haRegionQueue;
+ this.numberOfPuts = numberOfPuts;
+ }
/**
* region id can be specified to generate Thread unique events
*/
- private final int regionId;
+ int regionId = 0;
- DoPuts(ThreadGroup threadGroup, HARegionQueue haRegionQueue, int numberOfPuts) {
- this(threadGroup, haRegionQueue, numberOfPuts, 0);
- }
-
- DoPuts(ThreadGroup threadGroup, HARegionQueue haRegionQueue, int numberOfPuts, int regionId) {
- super(threadGroup, "DoPuts-" + nextDoPutsThreadNum());
+ DoPuts(HARegionQueue haRegionQueue, int numberOfPuts, int regionId) {
this.regionQueue = haRegionQueue;
this.numberOfPuts = numberOfPuts;
this.regionId = regionId;
@@ -330,16 +378,19 @@ public class BlockingHARegionJUnitTest {
@Override
public void run() {
- for (int i = 0; i < this.numberOfPuts; i++) {
- if (stopThreads || Thread.currentThread().isInterrupted()) {
- break;
- }
+ for (int i = 0; i < numberOfPuts; i++) {
try {
this.regionQueue.put(new ConflatableObject("" + i, "" + i,
- new EventID(new byte[this.regionId], i, i), false, REGION));
+ new EventID(new byte[regionId], i, i), false, "BlockingHARegionJUnitTest_Region"));
+ if (quitForLoop) {
+ break;
+ }
+ if (Thread.currentThread().isInterrupted()) {
+ break;
+ }
} catch (Exception e) {
- errorCollector.addError(e);
- break;
+ exceptionOccurred = true;
+ exceptionString.append(" Exception occurred due to " + e);
}
}
}
@@ -348,29 +399,24 @@ public class BlockingHARegionJUnitTest {
/**
* class which does a specified number of takes
*/
- private class DoTakes extends Thread {
+ private static class DoTake extends Thread {
- private final HARegionQueue regionQueue;
+ final HARegionQueue regionQueue;
+ final int numberOfTakes;
- private final int numberOfTakes;
-
- DoTakes(ThreadGroup threadGroup, HARegionQueue haRegionQueue, int numberOfTakes) {
- super(threadGroup, "DoTakes-" + nextDoTakesThreadNum());
+ DoTake(HARegionQueue haRegionQueue, int numberOfTakes) {
this.regionQueue = haRegionQueue;
this.numberOfTakes = numberOfTakes;
}
@Override
public void run() {
- for (int i = 0; i < this.numberOfTakes; i++) {
- if (stopThreads || Thread.currentThread().isInterrupted()) {
- break;
- }
+ for (int i = 0; i < numberOfTakes; i++) {
try {
assertNotNull(this.regionQueue.take());
} catch (Exception e) {
- errorCollector.addError(e);
- break;
+ exceptionOccurred = true;
+ exceptionString.append(" Exception occurred due to " + e);
}
}
}