You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by la...@apache.org on 2018/08/03 16:36:46 UTC
[geode] branch develop updated: GEODE-5495: Destroy available ID
before decrement in updateHAContainer()
This is an automated email from the ASF dual-hosted git repository.
ladyvader pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new acb50f9 GEODE-5495: Destroy available ID before decrement in updateHAContainer()
acb50f9 is described below
commit acb50f94157c25ada4163771758115a1cb108516
Author: Ryan McMahon <rm...@pivotal.io>
AuthorDate: Wed Aug 1 11:33:38 2018 -0700
GEODE-5495: Destroy available ID before decrement in updateHAContainer()
Co-authored-by: Ryan McMahon <rm...@pivotal.io>
Co-authored-by: Lynn Hughes-Godfrey <lh...@pivotal.io>
---
.../internal/cache/ha/HARegionQueueDUnitTest.java | 4 +-
.../cache/ha/HARQAddOperationJUnitTest.java | 36 ++---
.../cache/ha/HARegionQueueIntegrationTest.java | 173 ++++++++++++++++++---
.../internal/cache/ha/HARegionQueueJUnitTest.java | 24 +--
.../geode/internal/cache/ha/HARegionQueue.java | 20 +--
5 files changed, 198 insertions(+), 59 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ha/HARegionQueueDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ha/HARegionQueueDUnitTest.java
index 19d2bf2..3d8c452 100755
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ha/HARegionQueueDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ha/HARegionQueueDUnitTest.java
@@ -955,7 +955,7 @@ public class HARegionQueueDUnitTest extends JUnit4DistributedTestCase {
WaitCriterion ev = new WaitCriterion() {
@Override
public boolean done() {
- return hrq.getAvalaibleIds().size() == 0;
+ return hrq.getAvailableIds().size() == 0;
}
@Override
@@ -964,7 +964,7 @@ public class HARegionQueueDUnitTest extends JUnit4DistributedTestCase {
}
};
Wait.waitForCriterion(ev, 60 * 1000, 200, true);
- // assertIndexDetailsEquals(0, hrq.getAvalaibleIds().size());
+ // assertIndexDetailsEquals(0, hrq.getAvailableIds().size());
}
@Test
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARQAddOperationJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARQAddOperationJUnitTest.java
index cffd3e7..871fa09 100755
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARQAddOperationJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARQAddOperationJUnitTest.java
@@ -147,7 +147,7 @@ public class HARQAddOperationJUnitTest {
Long cntr = (Long) conflationMap.get(KEY1);
ConflatableObject retValue = (ConflatableObject) rq.getRegion().get(cntr);
assertEquals(VALUE2, retValue.getValueToConflate());
- assertEquals(1, rq.getAvalaibleIds().size());
+ assertEquals(1, rq.getAvailableIds().size());
assertEquals(1, rq.getCurrentCounterSet(id1).size());
this.logWriter.info("HARegionQueueJUnitTest : testQueueAddOperationWithConflation END");
@@ -171,13 +171,13 @@ public class HARQAddOperationJUnitTest {
this.rq.put(c1);
assertNull(rq.getConflationMapForTesting().get("region1"));
- assertEquals(1, rq.getAvalaibleIds().size());
+ assertEquals(1, rq.getAvailableIds().size());
assertEquals(1, rq.getCurrentCounterSet(id1).size());
this.rq.put(c2);
assertNull(rq.getConflationMapForTesting().get("region1"));
- assertEquals(2, rq.getAvalaibleIds().size());
+ assertEquals(2, rq.getAvailableIds().size());
assertEquals(2, rq.getCurrentCounterSet(id1).size());
Iterator iter = rq.getCurrentCounterSet(id1).iterator();
@@ -212,7 +212,7 @@ public class HARQAddOperationJUnitTest {
this.rq.put(obj);
this.rq.take();
assertNull(rq.getRegion().get(KEY1));
- assertEquals(0, this.rq.getAvalaibleIds().size());
+ assertEquals(0, this.rq.getAvailableIds().size());
Map eventsMap = this.rq.getEventsMapForTesting();
assertEquals(1, eventsMap.size());
assertEquals(0, rq.getCurrentCounterSet(id).size());
@@ -296,7 +296,7 @@ public class HARQAddOperationJUnitTest {
// removed from Region, LastDispatchedWrapperSet should have size 0.
Awaitility.await().atMost(60, TimeUnit.SECONDS)
.until(() -> assertEquals(0, regionqueue.getRegion().entrySet(false).size()));
- assertEquals(0, regionqueue.getAvalaibleIds().size());
+ assertEquals(0, regionqueue.getAvailableIds().size());
assertNull(regionqueue.getCurrentCounterSet(id1));
} catch (Exception e) {
@@ -323,11 +323,11 @@ public class HARQAddOperationJUnitTest {
}
// Available id size should be == 10 after puting ten entries
- assertEquals(10, regionqueue.getAvalaibleIds().size());
+ assertEquals(10, regionqueue.getAvailableIds().size());
// QRM message for therad id 1 and last sequence id 5
regionqueue.removeDispatchedEvents(ids[4]);
- assertEquals(5, regionqueue.getAvalaibleIds().size());
+ assertEquals(5, regionqueue.getAvailableIds().size());
assertEquals(5, regionqueue.getCurrentCounterSet(ids[0]).size());
Iterator iter = regionqueue.getCurrentCounterSet(ids[0]).iterator();
@@ -338,7 +338,7 @@ public class HARQAddOperationJUnitTest {
}
regionqueue.removeDispatchedEvents(ids[9]);
- assertEquals(0, regionqueue.getAvalaibleIds().size());
+ assertEquals(0, regionqueue.getAvailableIds().size());
}
/**
@@ -388,7 +388,7 @@ public class HARQAddOperationJUnitTest {
if (testFailed)
fail("Test failed due to " + message);
- assertEquals(0, regionqueue.getAvalaibleIds().size());
+ assertEquals(0, regionqueue.getAvailableIds().size());
assertEquals(2, regionqueue.getLastDispatchedSequenceId(id2));
}
@@ -438,7 +438,7 @@ public class HARQAddOperationJUnitTest {
if (testFailed)
fail("Test failed due to " + message);
- assertEquals(0, regionqueue.getAvalaibleIds().size());
+ assertEquals(0, regionqueue.getAvailableIds().size());
assertEquals(2, regionqueue.getLastDispatchedSequenceId(id2));
}
@@ -531,8 +531,8 @@ public class HARQAddOperationJUnitTest {
+ regionqueue.getEventsMapForTesting().size(),
numOfThreads, regionqueue.getEventsMapForTesting().size());
assertEquals(
- "size of availableids should 1 but actual size " + regionqueue.getAvalaibleIds().size(), 1,
- regionqueue.getAvalaibleIds().size());
+ "size of availableids should 1 but actual size " + regionqueue.getAvailableIds().size(), 1,
+ regionqueue.getAvailableIds().size());
int count = 0;
for (int i = 0; i < numOfThreads; i++) {
if ((regionqueue.getCurrentCounterSet(new EventID(new byte[] {(byte) i}, i, i))).size() > 0) {
@@ -543,8 +543,8 @@ public class HARQAddOperationJUnitTest {
assertEquals("size of the counter set is 1 but the actual size is " + count, 1, count);
Long position = null;
- if (regionqueue.getAvalaibleIds().size() == 1) {
- position = (Long) regionqueue.getAvalaibleIds().iterator().next();
+ if (regionqueue.getAvailableIds().size() == 1) {
+ position = (Long) regionqueue.getAvailableIds().iterator().next();
}
ConflatableObject id = (ConflatableObject) regionqueue.getRegion().get(position);
assertEquals(regionqueue.getCurrentCounterSet(id.getEventId()).size(), 1);
@@ -605,7 +605,7 @@ public class HARQAddOperationJUnitTest {
regionqueue.getCurrentCounterSet(new EventID(new byte[] {(byte) i}, i, 1)).size());
}
- assertEquals(0, regionqueue.getAvalaibleIds().size());
+ assertEquals(0, regionqueue.getAvailableIds().size());
this.logWriter.info("testPeekAndRemoveWithoutConflation() completed successfully");
}
@@ -666,7 +666,7 @@ public class HARQAddOperationJUnitTest {
regionqueue.getCurrentCounterSet(new EventID(new byte[] {(byte) i}, i, 1)).size());
}
- assertEquals("size of availableIds map should be 0 ", 0, regionqueue.getAvalaibleIds().size());
+ assertEquals("size of availableIds map should be 0 ", 0, regionqueue.getAvailableIds().size());
assertEquals("size of conflation map should be 0 ", 0,
((Map) regionqueue.getConflationMapForTesting().get("region1")).size());
@@ -770,7 +770,7 @@ public class HARQAddOperationJUnitTest {
regionqueue.getCurrentCounterSet(new EventID(new byte[] {(byte) i}, i, 1)).size());
}
- assertEquals(0, regionqueue.getAvalaibleIds().size());
+ assertEquals(0, regionqueue.getAvailableIds().size());
this.logWriter.info("testPeekForDiffBatchSizeAndRemoveAll() completed successfully");
}
@@ -863,7 +863,7 @@ public class HARQAddOperationJUnitTest {
if (testFailed)
fail("Test failed due to " + message);
- assertEquals(5, regionqueue.getAvalaibleIds().size());
+ assertEquals(5, regionqueue.getAvailableIds().size());
this.logWriter.info("testPeekForDiffBatchSizeAndRemoveSome() completed successfully");
}
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
index a1b71a1..0270177 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
@@ -16,6 +16,7 @@ package org.apache.geode.internal.cache.ha;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -39,6 +40,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.awaitility.core.ConditionTimeoutException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -387,24 +389,7 @@ public class HARegionQueueIntegrationTest {
List<HARegionQueue> regionQueues = new ArrayList<>();
for (int i = 0; i < 2; ++i) {
- HARegion haRegion = Mockito.mock(HARegion.class);
- when(haRegion.getGemFireCache()).thenReturn((InternalCache) cache);
-
- ConcurrentHashMap<Object, Object> mockRegion = new ConcurrentHashMap<>();
-
- when(haRegion.put(Mockito.any(Object.class), Mockito.any(Object.class))).then(answer -> {
- Object existingValue = mockRegion.put(answer.getArgument(0), answer.getArgument(1));
- return existingValue;
- });
-
- when(haRegion.get(Mockito.any(Object.class))).then(answer -> {
- return mockRegion.get(answer.getArgument(0));
- });
-
- doAnswer(answer -> {
- mockRegion.remove(answer.getArgument(0));
- return null;
- }).when(haRegion).localDestroy(Mockito.any(Object.class));
+ HARegion haRegion = createMockHARegion();
regionQueues.add(createHARegionQueue(haContainerWrapper, i, haRegion, false));
}
@@ -509,6 +494,158 @@ public class HARegionQueueIntegrationTest {
Assert.assertEquals("Container size was not the expected value", haContainerWrapper.size(), 1);
}
+ @Test
+ public void removeDispatchedEventsViaQRMAndDestroyQueueSimultaneouslySingleDecrement()
+ throws Exception {
+ HAContainerWrapper haContainerWrapper = new HAContainerMap(new ConcurrentHashMap());
+
+ HARegion haRegion = createMockHARegion();
+ HARegionQueue haRegionQueue = createHARegionQueue(haContainerWrapper, 0, haRegion, false);
+
+ EventID eventID = new EventID(cache.getDistributedSystem());
+ ClientUpdateMessage clientUpdateMessage =
+ new ClientUpdateMessageImpl(EnumListenerEvent.AFTER_UPDATE,
+ (LocalRegion) dataRegion, "key", "value".getBytes(), (byte) 0x01, null,
+ new ClientProxyMembershipID(), eventID);
+ HAEventWrapper haEventWrapper = new HAEventWrapper(clientUpdateMessage);
+ haEventWrapper.incrementPutInProgressCounter();
+ haEventWrapper.setHAContainer(haContainerWrapper);
+
+ haRegionQueue.put(haEventWrapper);
+
+ ExecutorService service = Executors.newFixedThreadPool(2);
+
+ List<Callable<Object>> callables = new ArrayList<>();
+
+ // In one thread, simulate processing a queue removal message
+ // by removing the dispatched event
+ callables.add(Executors.callable(() -> {
+ try {
+ haRegionQueue.removeDispatchedEvents(eventID);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }));
+
+ // In another thread, simulate that the region is being destroyed, for instance
+ // when a SocketTimeoutException is thrown and we are cleaning up
+ callables.add(Executors.callable(() -> {
+ try {
+ haRegionQueue.destroy();
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }));
+
+ List<Future<Object>> futures = service.invokeAll(callables, 10, TimeUnit.SECONDS);
+
+ for (Future<Object> future : futures) {
+ try {
+ future.get();
+ } catch (Exception ex) {
+ throw new TestException(
+ "Exception thrown while executing queue removal and destroy region queue logic concurrently.",
+ ex);
+ }
+ }
+
+ try {
+ await().atMost(10, TimeUnit.SECONDS).until(() -> haEventWrapper.getReferenceCount() == 0);
+ } catch (ConditionTimeoutException conditionTimeoutException) {
+ throw new TestException(
+ "Expected HAEventWrapper reference count to be decremented to 0 by either the queue removal or destroy queue logic, but the actual reference count was "
+ + haEventWrapper.getReferenceCount());
+ }
+ }
+
+ @Test
+ public void removeDispatchedEventsViaMessageDispatcherAndDestroyQueueSimultaneouslySingleDecrement()
+ throws Exception {
+ HAContainerWrapper haContainerWrapper = new HAContainerMap(new ConcurrentHashMap());
+
+ HARegion haRegion = createMockHARegion();
+ HARegionQueue haRegionQueue = createHARegionQueue(haContainerWrapper, 0, haRegion, false);
+
+ EventID eventID = new EventID(cache.getDistributedSystem());
+ ClientUpdateMessage clientUpdateMessage =
+ new ClientUpdateMessageImpl(EnumListenerEvent.AFTER_UPDATE,
+ (LocalRegion) dataRegion, "key", "value".getBytes(), (byte) 0x01, null,
+ new ClientProxyMembershipID(), eventID);
+ HAEventWrapper haEventWrapper = new HAEventWrapper(clientUpdateMessage);
+ haEventWrapper.incrementPutInProgressCounter();
+ haEventWrapper.setHAContainer(haContainerWrapper);
+
+ haRegionQueue.put(haEventWrapper);
+
+ ExecutorService service = Executors.newFixedThreadPool(2);
+
+ List<Callable<Object>> callables = new ArrayList<>();
+
+ // In one thread, simulate processing a queue removal message
+ // by removing the dispatched event
+ callables.add(Executors.callable(() -> {
+ try {
+ // Simulate dispatching a message by peeking and removing the HAEventWrapper
+ haRegionQueue.peek();
+ haRegionQueue.remove();
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }));
+
+ // In another thread, simulate that the region is being destroyed, for instance
+ // when a SocketTimeoutException is thrown and we are cleaning up
+ callables.add(Executors.callable(() -> {
+ try {
+ haRegionQueue.destroy();
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }));
+
+ List<Future<Object>> futures = service.invokeAll(callables, 10, TimeUnit.SECONDS);
+
+ for (Future<Object> future : futures) {
+ try {
+ future.get();
+ } catch (Exception ex) {
+ throw new TestException(
+ "Exception thrown while executing message dispatching and destroy region queue logic concurrently.",
+ ex);
+ }
+ }
+
+ try {
+ await().atMost(10, TimeUnit.SECONDS).until(() -> haEventWrapper.getReferenceCount() == 0);
+ } catch (ConditionTimeoutException conditionTimeoutException) {
+ throw new TestException(
+ "Expected HAEventWrapper reference count to be decremented to 0 by either the message dispatcher or destroy queue logic, but the actual reference count was "
+ + haEventWrapper.getReferenceCount());
+ }
+ }
+
+ private HARegion createMockHARegion() {
+ HARegion haRegion = Mockito.mock(HARegion.class);
+ when(haRegion.getGemFireCache()).thenReturn((InternalCache) cache);
+
+ ConcurrentHashMap<Object, Object> mockRegion = new ConcurrentHashMap<>();
+
+ when(haRegion.put(Mockito.any(Object.class), Mockito.any(Object.class))).then(answer -> {
+ Object existingValue = mockRegion.put(answer.getArgument(0), answer.getArgument(1));
+ return existingValue;
+ });
+
+ when(haRegion.get(Mockito.any(Object.class))).then(answer -> {
+ return mockRegion.get(answer.getArgument(0));
+ });
+
+ doAnswer(answer -> {
+ mockRegion.remove(answer.getArgument(0));
+ return null;
+ }).when(haRegion).localDestroy(Mockito.any(Object.class));
+ return haRegion;
+ }
+
private HAContainerRegion createHAContainerRegion() throws Exception {
Region haContainerRegionRegion = createHAContainerRegionRegion();
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java
index e1baf6c..9efb7ce 100755
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java
@@ -284,7 +284,7 @@ public class HARegionQueueJUnitTest {
!regionQueue.isEmpty(), is(true));
assertThat(
" Expected the available id's size not to be zero since expiry time has not been exceeded but it is not so ",
- !regionQueue.getAvalaibleIds().isEmpty(), is(true));
+ !regionQueue.getAvailableIds().isEmpty(), is(true));
assertThat(
" Expected conflation map size not to be zero since expiry time has not been exceeded but it is not so "
+ ((Map) regionQueue.getConflationMapForTesting().get(testName.getMethodName()))
@@ -297,7 +297,7 @@ public class HARegionQueueJUnitTest {
waitAtLeast(1000, start, () -> {
assertThat(regionQueue.getRegion().keys(), is(Collections.emptySet()));
- assertThat(regionQueue.getAvalaibleIds(), is(Collections.emptySet()));
+ assertThat(regionQueue.getAvailableIds(), is(Collections.emptySet()));
assertThat(regionQueue.getConflationMapForTesting().get(testName.getMethodName()),
is(Collections.emptyMap()));
assertThat(regionQueue.getEventsMapForTesting(), is(Collections.emptyMap()));
@@ -330,9 +330,9 @@ public class HARegionQueueJUnitTest {
" Expected region size not to be zero since expiry time has not been exceeded but it is not so ",
!regionQueue.isEmpty(), is(true));
assertThat(" Expected the available id's size not to have counter 1 but it has ",
- !regionQueue.getAvalaibleIds().contains(1L), is(true));
+ !regionQueue.getAvailableIds().contains(1L), is(true));
assertThat(" Expected the available id's size to have counter 2 but it does not have ",
- regionQueue.getAvalaibleIds().contains(2L), is(true));
+ regionQueue.getAvailableIds().contains(2L), is(true));
assertThat(" Expected eventID map not to have the first event, but it has",
!regionQueue.getCurrentCounterSet(ev1).contains(1L), is(true));
assertThat(" Expected eventID map to have the second event, but it does not",
@@ -393,7 +393,7 @@ public class HARegionQueueJUnitTest {
assertThat(" Expected region peek to return cf but it is not so ", regionQueue.peek(), is(cf));
assertThat(
" Expected the available id's size not to be zero since expiry time has not been exceeded but it is not so ",
- !regionQueue.getAvalaibleIds().isEmpty(), is(true));
+ !regionQueue.getAvailableIds().isEmpty(), is(true));
assertThat(
" Expected conflation map to have entry for this key since expiry time has not been exceeded but it is not so ",
((Map) regionQueue.getConflationMapForTesting().get(testName.getMethodName())).get("key"),
@@ -458,12 +458,12 @@ public class HARegionQueueJUnitTest {
// verify 1-5 not in available Id's map
for (int i = 1; i < 6; i++) {
- assertThat(!regionQueue.getAvalaibleIds().contains((long) i), is(true));
+ assertThat(!regionQueue.getAvailableIds().contains((long) i), is(true));
}
// verify 6-10 in available id's map
for (int i = 6; i < 11; i++) {
- assertThat(regionQueue.getAvalaibleIds().contains((long) i), is(true));
+ assertThat(regionQueue.getAvailableIds().contains((long) i), is(true));
}
}
@@ -531,12 +531,12 @@ public class HARegionQueueJUnitTest {
// verify 1-7 not in available Id's map
for (int i = 4; i < 11; i++) {
- assertThat(!regionQueue.getAvalaibleIds().contains((long) i), is(true));
+ assertThat(!regionQueue.getAvailableIds().contains((long) i), is(true));
}
// verify 8-10 in available id's map
for (int i = 1; i < 4; i++) {
- assertThat(regionQueue.getAvalaibleIds().contains((long) i), is(true));
+ assertThat(regionQueue.getAvailableIds().contains((long) i), is(true));
}
}
@@ -562,9 +562,9 @@ public class HARegionQueueJUnitTest {
// the old key should not be present
assertThat(!regionQueue.getRegion().containsKey(1L), is(true));
// available ids should not contain the old id (the old position)
- assertThat(!regionQueue.getAvalaibleIds().contains(1L), is(true));
+ assertThat(!regionQueue.getAvailableIds().contains(1L), is(true));
// available id should have the new id (the new position)
- assertThat(regionQueue.getAvalaibleIds().contains(2L), is(true));
+ assertThat(regionQueue.getAvailableIds().contains(2L), is(true));
// events map should not contain the old position
assertThat(regionQueue.getCurrentCounterSet(ev1).isEmpty(), is(true));
// events map should contain the new position
@@ -589,7 +589,7 @@ public class HARegionQueueJUnitTest {
Map conflationMap = ((HARegionQueue) regionqueue).getConflationMapForTesting();
assertThat(((Map) conflationMap.get(testName.getMethodName())).size(), is(5));
- Set availableIDs = ((HARegionQueue) regionqueue).getAvalaibleIds();
+ Set availableIDs = ((HARegionQueue) regionqueue).getAvailableIds();
Set counters = ((HARegionQueue) regionqueue).getCurrentCounterSet(qrmID);
assertThat(availableIDs.size(), is(5));
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
index 8c241b2..860a410 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
@@ -1724,7 +1724,7 @@ public class HARegionQueue implements RegionQueue {
* Used for testing purposes only
*
*/
- Set getAvalaibleIds() {
+ Set getAvailableIds() {
acquireReadLock();
try {
return Collections.unmodifiableSet(this.idsAvailable);
@@ -1744,7 +1744,8 @@ public class HARegionQueue implements RegionQueue {
* @param lastDispatched EventID containing the ThreadIdentifier and the last dispatched sequence
* Id
*/
- void removeDispatchedEvents(EventID lastDispatched) throws CacheException, InterruptedException {
+ protected void removeDispatchedEvents(EventID lastDispatched)
+ throws CacheException, InterruptedException {
ThreadIdentifier ti = getThreadIdentifier(lastDispatched);
long sequenceID = lastDispatched.getSequenceID();
// get the DispatchedAndCurrentEvents object for this threadID
@@ -3645,20 +3646,22 @@ public class HARegionQueue implements RegionQueue {
*/
private void updateHAContainer() {
try {
- Object[] wrapperArray = null;
+ Object[] availableIdsArray = null;
acquireReadLock();
try {
if (this.availableIDsSize() != 0) {
- wrapperArray = this.availableIDsArray();
+ availableIdsArray = this.availableIDsArray();
}
} finally {
releaseReadLock();
}
- if (wrapperArray != null) {
+ if (availableIdsArray != null) {
final Set wrapperSet = new HashSet();
- for (int i = 0; i < wrapperArray.length; i++) {
- wrapperSet.add(this.region.get(wrapperArray[i]));
+ for (int i = 0; i < availableIdsArray.length; i++) {
+ if (destroyFromAvailableIDs((long) availableIdsArray[i])) {
+ wrapperSet.add(this.region.get(availableIdsArray[i]));
+ }
}
// Start a new thread which will update the clientMessagesRegion for
@@ -3671,8 +3674,7 @@ public class HARegionQueue implements RegionQueue {
while (iter.hasNext()) {
Conflatable conflatable = (Conflatable) iter.next();
if (conflatable instanceof HAEventWrapper) {
- HARegionQueue.this
- .decAndRemoveFromHAContainer((HAEventWrapper) conflatable, "Destroy");
+ decAndRemoveFromHAContainer((HAEventWrapper) conflatable, "Destroy");
}
}
} catch (CancelException ignore) {