You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/05/24 23:41:48 UTC
[31/31] geode git commit: Cleanup test that recursed infinitely due
to failure in precheckin
Cleanup test that recursed infinitely due to failure in precheckin
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/00b9eb87
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/00b9eb87
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/00b9eb87
Branch: refs/heads/feature/GEODE-2632-17
Commit: 00b9eb87e06d9ec7db5043662dfbe9ab38c4927e
Parents: 1f178ef
Author: Kirk Lund <kl...@apache.org>
Authored: Wed May 24 16:40:29 2017 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Wed May 24 16:41:09 2017 -0700
----------------------------------------------------------------------
.../cache/ha/BlockingHARegionJUnitTest.java | 488 +++++++++----------
.../SerializableErrorCollector.java | 24 +
2 files changed, 242 insertions(+), 270 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/00b9eb87/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionJUnitTest.java
index d0f5793..3c1adc3 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionJUnitTest.java
@@ -14,76 +14,114 @@
*/
package org.apache.geode.internal.cache.ha;
+import static java.util.concurrent.TimeUnit.*;
import static org.apache.geode.distributed.ConfigurationProperties.*;
+import static org.apache.geode.internal.cache.ha.HARegionQueue.*;
import static org.junit.Assert.*;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Properties;
-import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionFactory;
+import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.test.dunit.ThreadUtils;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
import org.apache.geode.test.junit.categories.IntegrationTest;
+import org.apache.geode.test.junit.rules.serializable.SerializableErrorCollector;
+/**
+ * Integration tests for Blocking HARegionQueue.
+ *
+ * <p>
+ * #40314: Filled up queue causes all publishers to block
+ *
+ * <p>
+ * #37627: In case of out of order messages, (sequence Id violation), in spite of HARQ not full, the capacity (putPermits) of the HARQ exhausted.
+ */
@Category({IntegrationTest.class, ClientSubscriptionTest.class})
public class BlockingHARegionJUnitTest {
- private static InternalCache cache = null;
+ public static final String REGION = "BlockingHARegionJUnitTest_Region";
+ private static final long THREAD_TIMEOUT = 2 * 60 * 1000;
+
+ private final Object numberForThreadsLock = new Object();
+ private int numberForDoPuts;
+ private int numberForDoTakes;
+
+ volatile boolean stopThreads;
- /** boolean to record an exception occurence in another thread **/
- private static volatile boolean exceptionOccurred = false;
- /** StringBuffer to store the exception **/
- private static StringBuffer exceptionString = new StringBuffer();
- /** boolen to quit the for loop **/
- private static volatile boolean quitForLoop = false;
+ private InternalCache cache;
+ private HARegionQueueAttributes queueAttributes;
+ private List<Thread> threads;
+ private ThreadGroup threadGroup;
+
+ @Rule
+ public SerializableErrorCollector errorCollector = new SerializableErrorCollector();
@Before
public void setUp() throws Exception {
- Properties props = new Properties();
- props.setProperty(MCAST_PORT, "0");
- if (cache != null) {
- cache.close(); // fault tolerance
+ synchronized (this.numberForThreadsLock) {
+ this.numberForDoPuts = 0;
+ this.numberForDoTakes = 0;
+ }
+
+ this.stopThreads = false;
+ this.threads = new ArrayList<>();
+ this.threadGroup = new ThreadGroup(getClass().getSimpleName()) {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ errorCollector.addError(e);
+ }
+ };
+
+ this.queueAttributes = new HARegionQueueAttributes();
+
+ Properties config = new Properties();
+ config.setProperty(MCAST_PORT, "0");
+
+ this.cache = (InternalCache) CacheFactory.create(DistributedSystem.connect(config));
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ try {
+ this.stopThreads = true;
+ for (Thread thread : this.threads) {
+ thread.interrupt();
+ ThreadUtils.join(thread, THREAD_TIMEOUT);
+ }
+ } finally {
+ if (this.cache != null) {
+ this.cache.close();
+ }
}
- cache = (InternalCache) CacheFactory.create(DistributedSystem.connect(props));
}
/**
- * This test has a scenario where the HAReqionQueue capacity is just 1. There will be two thread.
+ * This test has a scenario where the HARegionQueue capacity is just 1. There will be two thread.
* One doing a 1000 puts and the other doing a 1000 takes. The validation for this test is that it
* should not encounter any exceptions
*/
@Test
public void testBoundedPuts() throws Exception {
- exceptionOccurred = false;
- HARegionQueueAttributes harqa = new HARegionQueueAttributes();
- harqa.setBlockingQueueCapacity(1);
- HARegionQueue hrq = HARegionQueue.getHARegionQueueInstance("BlockingHARegionJUnitTest_Region",
- cache, harqa, HARegionQueue.BLOCKING_HA_QUEUE, false);
- hrq.setPrimary(true);// fix for 40314 - capacity constraint is checked for primary only.
- Thread thread1 = new DoPuts(hrq, 1000);
- Thread thread2 = new DoTake(hrq, 1000);
-
- thread1.start();
- thread2.start();
-
- ThreadUtils.join(thread1, 30 * 1000);
- ThreadUtils.join(thread2, 30 * 1000);
-
- if (exceptionOccurred) {
- fail(" Test failed due to " + exceptionString);
- }
+ this.queueAttributes.setBlockingQueueCapacity(1);
+ HARegionQueue hrq = getHARegionQueueInstance(REGION, this.cache, this.queueAttributes, BLOCKING_HA_QUEUE, false);
+ hrq.setPrimary(true); // fix for 40314 - capacity constraint is checked for primary only
- cache.close();
+ startDoPuts(hrq, 1000);
+ startDoTakes(hrq, 1000);
}
/**
@@ -96,62 +134,22 @@ public class BlockingHARegionJUnitTest {
*/
@Test
public void testPutBeingBlocked() throws Exception {
- exceptionOccurred = false;
- quitForLoop = false;
- HARegionQueueAttributes harqa = new HARegionQueueAttributes();
- harqa.setBlockingQueueCapacity(1);
- final HARegionQueue hrq = HARegionQueue.getHARegionQueueInstance(
- "BlockingHARegionJUnitTest_Region", cache, harqa, HARegionQueue.BLOCKING_HA_QUEUE, false);
- hrq.setPrimary(true);// fix for 40314 - capacity constraint is checked for primary only.
- final Thread thread1 = new DoPuts(hrq, 2);
- thread1.start();
- WaitCriterion ev = new WaitCriterion() {
- public boolean done() {
- return hrq.region.size() == 2;
- }
+ this.queueAttributes.setBlockingQueueCapacity(1);
+ HARegionQueue hrq = getHARegionQueueInstance(REGION, this.cache, this.queueAttributes, BLOCKING_HA_QUEUE, false);
+ hrq.setPrimary(true); // fix for 40314 - capacity constraint is checked for primary only
- public String description() {
- return null;
- }
- };
- Wait.waitForCriterion(ev, 1000, 200, true);
- assertTrue(thread1.isAlive()); // thread should still be alive (in wait state)
-
- Thread thread2 = new DoTake(hrq, 1);
- thread2.start(); // start take thread
- ev = new WaitCriterion() {
- public boolean done() {
- return hrq.region.size() == 3;
- }
+ Thread doPuts = startDoPuts(hrq, 2);
- public String description() {
- return null;
- }
- };
- // sleep. take will proceed and so will sleeping put
- Wait.waitForCriterion(ev, 3 * 1000, 200, true);
+ await().until(() -> assertTrue(hrq.region.size() == 2));
- // thread should have died since put should have proceeded
- ev = new WaitCriterion() {
- public boolean done() {
- return !thread1.isAlive();
- }
+ // thread should still be alive (in wait state)
+ assertTrue(doPuts.isAlive());
- public String description() {
- return "thread1 still alive";
- }
- };
- Wait.waitForCriterion(ev, 30 * 1000, 1000, true);
+ startDoTakes(hrq, 1);
- ThreadUtils.join(thread1, 30 * 1000); // for completeness
- ThreadUtils.join(thread2, 30 * 1000);
- if (exceptionOccurred) {
- fail(" Test failed due to " + exceptionString);
- }
- cache.close();
+ await().until(() -> assertTrue(hrq.region.size() == 3));
}
-
/**
* This test tests that the region capacity is never exceeded even in highly concurrent
* environments. The region capacity is set to 10000. Then 5 threads start doing put
@@ -161,62 +159,25 @@ public class BlockingHARegionJUnitTest {
*/
@Test
public void testConcurrentPutsNotExceedingLimit() throws Exception {
- exceptionOccurred = false;
- quitForLoop = false;
- HARegionQueueAttributes harqa = new HARegionQueueAttributes();
- harqa.setBlockingQueueCapacity(10000);
- final HARegionQueue hrq = HARegionQueue.getHARegionQueueInstance(
- "BlockingHARegionJUnitTest_Region", cache, harqa, HARegionQueue.BLOCKING_HA_QUEUE, false);
- hrq.setPrimary(true);// fix for 40314 - capacity constraint is checked for primary only.
- Thread thread1 = new DoPuts(hrq, 20000, 1);
- Thread thread2 = new DoPuts(hrq, 20000, 2);
- Thread thread3 = new DoPuts(hrq, 20000, 3);
- Thread thread4 = new DoPuts(hrq, 20000, 4);
- Thread thread5 = new DoPuts(hrq, 20000, 5);
-
- thread1.start();
- thread2.start();
- thread3.start();
- thread4.start();
- thread5.start();
-
- WaitCriterion ev = new WaitCriterion() {
- public boolean done() {
- return hrq.region.size() == 20000;
- }
-
- public String description() {
- return null;
- }
- };
- Wait.waitForCriterion(ev, 30 * 1000, 200, true);
-
- assertTrue(thread1.isAlive());
- assertTrue(thread2.isAlive());
- assertTrue(thread3.isAlive());
- assertTrue(thread4.isAlive());
- assertTrue(thread5.isAlive());
-
- assertTrue(hrq.region.size() == 20000);
+ this.queueAttributes.setBlockingQueueCapacity(10000);
+ HARegionQueue hrq = getHARegionQueueInstance(REGION, this.cache, this.queueAttributes, BLOCKING_HA_QUEUE, false);
+ hrq.setPrimary(true); // fix for 40314 - capacity constraint is checked for primary only
- quitForLoop = true;
- Thread.sleep(20000);
+ Thread doPuts1 = startDoPuts(hrq, 20000, 1);
+ Thread doPuts2 = startDoPuts(hrq, 20000, 2);
+ Thread doPuts3 = startDoPuts(hrq, 20000, 3);
+ Thread doPuts4 = startDoPuts(hrq, 20000, 4);
+ Thread doPuts5 = startDoPuts(hrq, 20000, 5);
- thread1.interrupt();
- thread2.interrupt();
- thread3.interrupt();
- thread4.interrupt();
- thread5.interrupt();
+ await().until(() -> assertTrue(hrq.region.size() == 20000));
- Thread.sleep(2000);
+ assertTrue(doPuts1.isAlive());
+ assertTrue(doPuts2.isAlive());
+ assertTrue(doPuts3.isAlive());
+ assertTrue(doPuts4.isAlive());
+ assertTrue(doPuts5.isAlive());
- ThreadUtils.join(thread1, 5 * 60 * 1000);
- ThreadUtils.join(thread2, 5 * 60 * 1000);
- ThreadUtils.join(thread3, 5 * 60 * 1000);
- ThreadUtils.join(thread4, 5 * 60 * 1000);
- ThreadUtils.join(thread5, 5 * 60 * 1000);
-
- cache.close();
+ assertTrue(hrq.region.size() == 20000);
}
/**
@@ -226,84 +187,40 @@ public class BlockingHARegionJUnitTest {
* state. the region size would be verified to be 20000 (10000 puts and 10000 DACE objects). then
* the threads are interrupted and made to quit the loop
*/
- @Ignore("TODO: test is disabled")
+ @Ignore("Test is disabled until/if blocking queue capacity becomes a hard limit")
@Test
public void testConcurrentPutsTakesNotExceedingLimit() throws Exception {
- exceptionOccurred = false;
- quitForLoop = false;
- HARegionQueueAttributes harqa = new HARegionQueueAttributes();
- harqa.setBlockingQueueCapacity(10000);
- final HARegionQueue hrq = HARegionQueue.getHARegionQueueInstance(
- "BlockingHARegionJUnitTest_Region", cache, harqa, HARegionQueue.BLOCKING_HA_QUEUE, false);
- Thread thread1 = new DoPuts(hrq, 40000, 1);
- Thread thread2 = new DoPuts(hrq, 40000, 2);
- Thread thread3 = new DoPuts(hrq, 40000, 3);
- Thread thread4 = new DoPuts(hrq, 40000, 4);
- Thread thread5 = new DoPuts(hrq, 40000, 5);
-
- Thread thread6 = new DoTake(hrq, 5000);
- Thread thread7 = new DoTake(hrq, 5000);
- Thread thread8 = new DoTake(hrq, 5000);
- Thread thread9 = new DoTake(hrq, 5000);
- Thread thread10 = new DoTake(hrq, 5000);
-
- thread1.start();
- thread2.start();
- thread3.start();
- thread4.start();
- thread5.start();
-
- thread6.start();
- thread7.start();
- thread8.start();
- thread9.start();
- thread10.start();
-
- ThreadUtils.join(thread6, 30 * 1000);
- ThreadUtils.join(thread7, 30 * 1000);
- ThreadUtils.join(thread8, 30 * 1000);
- ThreadUtils.join(thread9, 30 * 1000);
- ThreadUtils.join(thread10, 30 * 1000);
-
- WaitCriterion ev = new WaitCriterion() {
- public boolean done() {
- return hrq.region.size() == 20000;
- }
-
- public String description() {
- return null;
- }
- };
- Wait.waitForCriterion(ev, 30 * 1000, 200, true);
-
- assertTrue(thread1.isAlive());
- assertTrue(thread2.isAlive());
- assertTrue(thread3.isAlive());
- assertTrue(thread4.isAlive());
- assertTrue(thread5.isAlive());
+ this.queueAttributes.setBlockingQueueCapacity(10000);
+ HARegionQueue hrq = getHARegionQueueInstance(REGION, this.cache, this.queueAttributes, BLOCKING_HA_QUEUE, false);
+ hrq.setPrimary(true); // fix for 40314 - capacity constraint is checked for primary only
+
+ Thread doPuts1 = startDoPuts(hrq, 40000, 1);
+ Thread doPuts2 = startDoPuts(hrq, 40000, 2);
+ Thread doPuts3 = startDoPuts(hrq, 40000, 3);
+ Thread doPuts4 = startDoPuts(hrq, 40000, 4);
+ Thread doPuts5 = startDoPuts(hrq, 40000, 5);
+
+ Thread doTakes1 = startDoTakes(hrq, 5000);
+ Thread doTakes2 = startDoTakes(hrq, 5000);
+ Thread doTakes3 = startDoTakes(hrq, 5000);
+ Thread doTakes4 = startDoTakes(hrq, 5000);
+ Thread doTakes5 = startDoTakes(hrq, 5000);
+
+ ThreadUtils.join(doTakes1, 30 * 1000);
+ ThreadUtils.join(doTakes2, 30 * 1000);
+ ThreadUtils.join(doTakes3, 30 * 1000);
+ ThreadUtils.join(doTakes4, 30 * 1000);
+ ThreadUtils.join(doTakes5, 30 * 1000);
+
+ await().until(() -> assertTrue(hrq.region.size() == 20000));
+
+ assertTrue(doPuts1.isAlive());
+ assertTrue(doPuts2.isAlive());
+ assertTrue(doPuts3.isAlive());
+ assertTrue(doPuts4.isAlive());
+ assertTrue(doPuts5.isAlive());
assertTrue(hrq.region.size() == 20000);
-
- quitForLoop = true;
-
- Thread.sleep(2000);
-
- thread1.interrupt();
- thread2.interrupt();
- thread3.interrupt();
- thread4.interrupt();
- thread5.interrupt();
-
- Thread.sleep(2000);
-
-
- ThreadUtils.join(thread1, 30 * 1000);
- ThreadUtils.join(thread2, 30 * 1000);
- ThreadUtils.join(thread3, 30 * 1000);
- ThreadUtils.join(thread4, 30 * 1000);
- ThreadUtils.join(thread5, 30 * 1000);
-
- cache.close();
}
/**
@@ -315,62 +232,91 @@ public class BlockingHARegionJUnitTest {
*/
@Test
public void testHARQMaxCapacity_Bug37627() throws Exception {
- try {
- exceptionOccurred = false;
- quitForLoop = false;
- HARegionQueueAttributes harqa = new HARegionQueueAttributes();
- harqa.setBlockingQueueCapacity(1);
- harqa.setExpiryTime(180);
- final HARegionQueue hrq = HARegionQueue.getHARegionQueueInstance(
- "BlockingHARegionJUnitTest_Region", cache, harqa, HARegionQueue.BLOCKING_HA_QUEUE, false);
- hrq.setPrimary(true);// fix for 40314 - capacity constraint is checked for primary only.
- final EventID id1 = new EventID(new byte[] {1}, 1, 2); // violation
- final EventID ignore = new EventID(new byte[] {1}, 1, 1); //
- final EventID id2 = new EventID(new byte[] {1}, 1, 3); //
- Thread t1 = new Thread() {
- public void run() {
- try {
- hrq.put(new ConflatableObject("key1", "value1", id1, false, "region1"));
- hrq.take();
- hrq.put(new ConflatableObject("key2", "value1", ignore, false, "region1"));
- hrq.put(new ConflatableObject("key3", "value1", id2, false, "region1"));
- } catch (Exception e) {
- exceptionString.append("First Put in region queue failed");
- exceptionOccurred = true;
- }
+ this.queueAttributes.setBlockingQueueCapacity(1);
+ this.queueAttributes.setExpiryTime(180);
+ HARegionQueue hrq = getHARegionQueueInstance(REGION, this.cache, this.queueAttributes, BLOCKING_HA_QUEUE, false);
+ hrq.setPrimary(true); // fix for 40314 - capacity constraint is checked for primary only
+
+ EventID event1 = new EventID(new byte[] {1}, 1, 2); // violation
+ EventID event2 = new EventID(new byte[] {1}, 1, 1); // ignored
+ EventID event3 = new EventID(new byte[] {1}, 1, 3);
+
+ newThread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ hrq.put(new ConflatableObject("key1", "value1", event1, false, "region1"));
+ hrq.take();
+ hrq.put(new ConflatableObject("key2", "value1", event2, false, "region1"));
+ hrq.put(new ConflatableObject("key3", "value1", event3, false, "region1"));
+ } catch (Exception e) {
+ errorCollector.addError(e);
}
- };
- t1.start();
- ThreadUtils.join(t1, 20 * 1000);
- if (exceptionOccurred) {
- fail(" Test failed due to " + exceptionString);
- }
- } finally {
- if (cache != null) {
- cache.close();
}
+ });
+ }
+
+ private Thread newThread(Runnable runnable) {
+ Thread thread = new Thread(this.threadGroup, runnable);
+ this.threads.add(thread);
+ thread.start();
+ return thread;
+ }
+
+ private Thread startDoPuts(HARegionQueue haRegionQueue, int count) {
+ return startDoPuts(haRegionQueue, count, 0);
+ }
+
+ private Thread startDoPuts(HARegionQueue haRegionQueue, int count, int regionId) {
+ Thread thread = new DoPuts(this.threadGroup, haRegionQueue, count, regionId);
+ this.threads.add(thread);
+ thread.start();
+ return thread;
+ }
+
+ private Thread startDoTakes(HARegionQueue haRegionQueue, int count) {
+ Thread thread = new DoTakes(this.threadGroup, haRegionQueue, count);
+ this.threads.add(thread);
+ thread.start();
+ return thread;
+ }
+
+ private ConditionFactory await() {
+ return Awaitility.await().atMost(2, MINUTES);
+ }
+
+ int nextDoPutsThreadNum() {
+ synchronized (this.numberForThreadsLock) {
+ return numberForDoPuts++;
+ }
+ }
+
+ int nextDoTakesThreadNum() {
+ synchronized (this.numberForThreadsLock) {
+ return numberForDoTakes++;
}
}
/**
* class which does specified number of puts on the queue
*/
- private static class DoPuts extends Thread {
+ private class DoPuts extends Thread {
- HARegionQueue regionQueue = null;
- final int numberOfPuts;
+ private final HARegionQueue regionQueue;
- DoPuts(HARegionQueue haRegionQueue, int numberOfPuts) {
- this.regionQueue = haRegionQueue;
- this.numberOfPuts = numberOfPuts;
- }
+ private final int numberOfPuts;
/**
* region id can be specified to generate Thread unique events
*/
- int regionId = 0;
+ private final int regionId;
- DoPuts(HARegionQueue haRegionQueue, int numberOfPuts, int regionId) {
+ DoPuts(ThreadGroup threadGroup, HARegionQueue haRegionQueue, int numberOfPuts) {
+ this(threadGroup, haRegionQueue, numberOfPuts, 0);
+ }
+
+ DoPuts(ThreadGroup threadGroup, HARegionQueue haRegionQueue, int numberOfPuts, int regionId) {
+ super(threadGroup, "DoPuts-" + nextDoPutsThreadNum());
this.regionQueue = haRegionQueue;
this.numberOfPuts = numberOfPuts;
this.regionId = regionId;
@@ -378,19 +324,16 @@ public class BlockingHARegionJUnitTest {
@Override
public void run() {
- for (int i = 0; i < numberOfPuts; i++) {
+ for (int i = 0; i < this.numberOfPuts; i++) {
+ if (stopThreads || Thread.currentThread().isInterrupted()) {
+ break;
+ }
try {
this.regionQueue.put(new ConflatableObject("" + i, "" + i,
- new EventID(new byte[regionId], i, i), false, "BlockingHARegionJUnitTest_Region"));
- if (quitForLoop) {
- break;
- }
- if (Thread.currentThread().isInterrupted()) {
- break;
- }
+ new EventID(new byte[this.regionId], i, i), false, REGION));
} catch (Exception e) {
- exceptionOccurred = true;
- exceptionString.append(" Exception occurred due to " + e);
+ errorCollector.addError(e);
+ break;
}
}
}
@@ -399,24 +342,29 @@ public class BlockingHARegionJUnitTest {
/**
* class which does a specified number of takes
*/
- private static class DoTake extends Thread {
+ private class DoTakes extends Thread {
- final HARegionQueue regionQueue;
- final int numberOfTakes;
+ private final HARegionQueue regionQueue;
- DoTake(HARegionQueue haRegionQueue, int numberOfTakes) {
+ private final int numberOfTakes;
+
+ DoTakes(ThreadGroup threadGroup, HARegionQueue haRegionQueue, int numberOfTakes) {
+ super(threadGroup, "DoTakes-" + nextDoTakesThreadNum());
this.regionQueue = haRegionQueue;
this.numberOfTakes = numberOfTakes;
}
@Override
public void run() {
- for (int i = 0; i < numberOfTakes; i++) {
+ for (int i = 0; i < this.numberOfTakes; i++) {
+ if (stopThreads || Thread.currentThread().isInterrupted()) {
+ break;
+ }
try {
assertNotNull(this.regionQueue.take());
} catch (Exception e) {
- exceptionOccurred = true;
- exceptionString.append(" Exception occurred due to " + e);
+ errorCollector.addError(e);
+ break;
}
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/00b9eb87/geode-junit/src/main/java/org/apache/geode/test/junit/rules/serializable/SerializableErrorCollector.java
----------------------------------------------------------------------
diff --git a/geode-junit/src/main/java/org/apache/geode/test/junit/rules/serializable/SerializableErrorCollector.java b/geode-junit/src/main/java/org/apache/geode/test/junit/rules/serializable/SerializableErrorCollector.java
new file mode 100644
index 0000000..0abfdaf
--- /dev/null
+++ b/geode-junit/src/main/java/org/apache/geode/test/junit/rules/serializable/SerializableErrorCollector.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.test.junit.rules.serializable;
+
+import org.junit.rules.ErrorCollector;
+
+import java.io.Serializable;
+
+public class SerializableErrorCollector extends ErrorCollector implements Serializable {
+}