You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2020/07/29 19:26:45 UTC
[geode] branch support/1.13 updated: GEODE-8323: Process
QueueRemovalMessage after queue initialized. (#5333)
This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch support/1.13
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.13 by this push:
new 3c87e10 GEODE-8323: Process QueueRemovalMessage after queue initialized. (#5333)
3c87e10 is described below
commit 3c87e1059c11d52cb2602d76765a4118b86ef8cb
Author: Eric Shu <es...@pivotal.io>
AuthorDate: Tue Jul 7 08:50:57 2020 -0700
GEODE-8323: Process QueueRemovalMessage after queue initialized. (#5333)
(cherry picked from commit 625a204f1a7173323648c05bb0002aded06e2a69)
---
.../org/apache/geode/internal/cache/HARegion.java | 15 +-
.../geode/internal/cache/ha/HARegionQueue.java | 46 ++++-
.../internal/cache/ha/QueueRemovalMessage.java | 169 ++++++++--------
.../apache/geode/internal/cache/HARegionTest.java | 77 ++++++++
.../geode/internal/cache/ha/HARegionQueueTest.java | 39 ++++
.../internal/cache/ha/QueueRemovalMessageTest.java | 215 +++++++++++++++++++++
6 files changed, 474 insertions(+), 87 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java
index 1ab0dbd..98f6fb5 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java
@@ -20,6 +20,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.Logger;
@@ -92,7 +93,7 @@ public class HARegion extends DistributedRegion {
private volatile HARegionQueue owningQueue;
- private HARegion(String regionName, RegionAttributes attrs, LocalRegion parentRegion,
+ HARegion(String regionName, RegionAttributes attrs, LocalRegion parentRegion,
InternalCache cache, StatisticsClock statisticsClock) {
super(regionName, attrs, parentRegion, cache,
new InternalRegionArguments().setDestroyLockFlag(true).setRecreateFlag(false)
@@ -274,6 +275,18 @@ public class HARegion extends DistributedRegion {
return this.owningQueue.isQueueInitialized() ? this.owningQueue : null;
}
+ public HARegionQueue getOwnerWithWait(long timeout) {
+ if (owningQueue.isQueueInitializedWithWait(timeout)) {
+ return owningQueue;
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("After waiting for {} seconds, queue is still not initialized",
+ TimeUnit.MILLISECONDS.toSeconds(timeout));
+ }
+ return null;
+ }
+ }
+
@Override
public CachePerfStats getCachePerfStats() {
return this.haRegionStats;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
index 5d61284..77e10248e 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
@@ -335,7 +335,6 @@ public class HARegionQueue implements RegionQueue {
ClientProxyMembershipID clientProxyId, final byte clientConflation, boolean isPrimary,
StatisticsClock statisticsClock)
throws IOException, ClassNotFoundException, CacheException, InterruptedException {
-
String processedRegionName = createRegionName(regionName);
// Initialize the statistics
@@ -374,7 +373,10 @@ public class HARegionQueue implements RegionQueue {
putGIIDataInRegion();
if (this.getClass() == HARegionQueue.class) {
- initialized.set(true);
+ synchronized (initialized) {
+ initialized.set(true);
+ initialized.notifyAll();
+ }
}
}
@@ -407,10 +409,31 @@ public class HARegionQueue implements RegionQueue {
putGIIDataInRegion();
}
if (this.getClass() == HARegionQueue.class) {
- initialized.set(true);
+ synchronized (initialized) {
+ initialized.set(true);
+ initialized.notifyAll();
+ }
}
}
+ public boolean isQueueInitializedWithWait(long waitTime) {
+ try {
+ synchronized (initialized) {
+ if (!isQueueInitialized()) {
+ waitForInitialized(waitTime);
+ }
+ }
+ } catch (InterruptedException interruptedException) {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ return isQueueInitialized();
+ }
+
+ void waitForInitialized(long waitTime) throws InterruptedException {
+ initialized.wait(waitTime);
+ }
+
/**
* install DACE information from an initial image provider
*/
@@ -2215,7 +2238,10 @@ public class HARegionQueue implements RegionQueue {
super.putGIIDataInRegion();
if (this.getClass() == BlockingHARegionQueue.class) {
- initialized.set(true);
+ synchronized (initialized) {
+ initialized.set(true);
+ initialized.notifyAll();
+ }
}
}
@@ -2438,12 +2464,14 @@ public class HARegionQueue implements RegionQueue {
throws IOException, ClassNotFoundException, CacheException, InterruptedException {
super(regionName, cache, hrqa, haContainer, clientProxyId, clientConflation, isPrimary,
statisticsClock);
+ threadIdToSeqId.keepPrevAcks = true;
+ durableIDsList = new LinkedHashSet();
+ ackedEvents = new HashMap();
- this.threadIdToSeqId.keepPrevAcks = true;
- this.durableIDsList = new LinkedHashSet();
- this.ackedEvents = new HashMap();
- this.initialized.set(true);
-
+ synchronized (initialized) {
+ initialized.set(true);
+ initialized.notifyAll();
+ }
}
@Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/QueueRemovalMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/QueueRemovalMessage.java
index 297aa8c..f789411 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/QueueRemovalMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/QueueRemovalMessage.java
@@ -14,7 +14,7 @@
*/
package org.apache.geode.internal.cache.ha;
-import static org.apache.geode.internal.cache.LocalRegion.InitializationLevel.BEFORE_INITIAL_IMAGE;
+import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
import java.io.DataInput;
import java.io.DataOutput;
@@ -36,7 +36,6 @@ import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.HARegion;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
-import org.apache.geode.internal.cache.LocalRegion.InitializationLevel;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.logging.internal.log4j.api.LogService;
@@ -52,20 +51,20 @@ public class QueueRemovalMessage extends PooledDistributionMessage {
/**
* List of messages (String[] )
*/
- private List messagesList;
+ private List<Object> messagesList;
/**
* Constructor : Set the recipient list to ALL_RECIPIENTS
*/
public QueueRemovalMessage() {
- this.setRecipient(ALL_RECIPIENTS);
+ setRecipient(ALL_RECIPIENTS);
}
/**
* Set the message list
*/
- public void setMessagesList(List messages) {
- this.messagesList = messages;
+ void setMessagesList(List messages) {
+ this.messagesList = uncheckedCast(messages);
}
/**
@@ -76,72 +75,88 @@ public class QueueRemovalMessage extends PooledDistributionMessage {
protected void process(ClusterDistributionManager dm) {
final InternalCache cache = dm.getCache();
if (cache != null) {
- Iterator iterator = this.messagesList.iterator();
- final InitializationLevel oldLevel =
- LocalRegion.setThreadInitLevelRequirement(BEFORE_INITIAL_IMAGE);
- try {
- while (iterator.hasNext()) {
- final String regionName = (String) iterator.next();
- final int size = (Integer) iterator.next();
- final LocalRegion region = (LocalRegion) cache.getRegion(regionName);
- final HARegionQueue hrq;
- if (region == null || !region.isInitialized()) {
- hrq = null;
- } else {
- HARegionQueue tmp = ((HARegion) region).getOwner();
- if (tmp != null && tmp.isQueueInitialized()) {
- hrq = tmp;
- } else {
- hrq = null;
- }
- }
- // we have to iterate even if the hrq isn't available since there are
- // a bunch of event IDs to go through
- for (int i = 0; i < size; i++) {
- final EventID id = (EventID) iterator.next();
- boolean interrupted = Thread.interrupted();
- if (hrq == null || !hrq.isQueueInitialized()) {
- continue;
- }
- try {
- // Fix for bug 39516: inline removal of events by QRM.
- // dm.getWaitingThreadPool().execute(new Runnable() {
- // public void run()
- // {
- try {
- if (logger.isTraceEnabled()) {
- logger.trace("QueueRemovalMessage: removing dispatched events on queue {} for {}",
- regionName, id);
- }
- hrq.removeDispatchedEvents(id);
- } catch (RegionDestroyedException ignore) {
- logger.info(
- "Queue found destroyed while processing the last disptached sequence ID for a HARegionQueue's DACE. The event ID is {} for HARegion with name={}",
- new Object[] {id, regionName});
- } catch (CancelException ignore) {
- return; // cache or DS is closing
- } catch (CacheException e) {
- logger.error(String.format(
- "QueueRemovalMessage::process:Exception in processing the last disptached sequence ID for a HARegionQueue's DACE. The problem is with event ID,%s for HARegion with name=%s",
- new Object[] {regionName, id}),
- e);
- } catch (InterruptedException ignore) {
- return; // interrupt occurs during shutdown. this runs in an executor, so just stop
- // processing
- }
- } catch (RejectedExecutionException ignore) {
- interrupted = true;
- } finally {
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
- }
- } // if
- } // for
- } finally {
- LocalRegion.setThreadInitLevelRequirement(oldLevel);
+ Iterator iterator = messagesList.iterator();
+ processRegionQueues(cache, iterator);
+ }
+ }
+
+ void processRegionQueues(InternalCache cache, Iterator iterator) {
+ while (iterator.hasNext()) {
+ final String regionName = (String) iterator.next();
+ final int size = (Integer) iterator.next();
+ final LocalRegion region = (LocalRegion) cache.getRegion(regionName);
+ final HARegionQueue hrq;
+ if (region == null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("processing QRM region {} does not exist.", regionName);
+ }
+ hrq = null;
+ } else {
+ long maxWaitTimeForInitialization = 30000;
+ hrq = ((HARegion) region).getOwnerWithWait(maxWaitTimeForInitialization);
+ }
+ boolean succeed = processRegionQueue(iterator, regionName, size, hrq);
+ if (!succeed) {
+ return;
+ }
+ }
+ }
+
+ boolean processRegionQueue(Iterator iterator, String regionName, int size,
+ HARegionQueue hrq) {
+ // we have to iterate even if the hrq isn't available since there are
+ // a bunch of event IDs to go through
+ for (int i = 0; i < size; i++) {
+ final EventID id = (EventID) iterator.next();
+ if (hrq == null || !hrq.isQueueInitialized()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("QueueRemovalMessage: hrq is not ready when trying to remove "
+ + "dispatched event on queue {} for {}", regionName, id);
+ }
+ continue;
+ }
+
+ if (!removeQueueEvent(regionName, hrq, id)) {
+ return false;
}
- } // cache != null
+ }
+ return true;
+ }
+
+ boolean removeQueueEvent(String regionName, HARegionQueue hrq, EventID id) {
+ // Fix for bug 39516: inline removal of events by QRM.
+ // dm.getWaitingThreadPool().execute(new Runnable() {
+ // public void run()
+ // {
+ boolean interrupted = Thread.interrupted();
+ try {
+ if (logger.isTraceEnabled()) {
+ logger.trace("QueueRemovalMessage: removing dispatched events on queue {} for {}",
+ regionName, id);
+ }
+ hrq.removeDispatchedEvents(id);
+ } catch (RegionDestroyedException ignore) {
+ logger.info(
+ "Queue found destroyed while processing the last dispatched sequence ID for a HARegionQueue's DACE. The event ID is {} for HARegion with name={}",
+ new Object[] {id, regionName});
+ } catch (CancelException ignore) {
+ return false;
+ } catch (CacheException e) {
+ logger.error(String.format(
+ "QueueRemovalMessage::process:Exception in processing the last dispatched sequence ID for a HARegionQueue's DACE. The problem is with event ID, %s for HARegion with name=%s",
+ regionName, id),
+ e);
+ } catch (InterruptedException ignore) {
+ Thread.currentThread().interrupt();
+ return false;
+ } catch (RejectedExecutionException ignore) {
+ interrupted = true;
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ return true;
}
@Override
@@ -190,18 +205,18 @@ public class QueueRemovalMessage extends PooledDistributionMessage {
super.fromData(in, context);
// read the size of the message
int size = DataSerializer.readInteger(in);
- this.messagesList = new LinkedList();
+ messagesList = new LinkedList<>();
int eventIdSizeInt;
for (int i = 0; i < size; i++) {
// read the region name
- this.messagesList.add(DataSerializer.readString(in));
- // read the datasize
+ messagesList.add(DataSerializer.readString(in));
+ // read the data size
Integer eventIdSize = DataSerializer.readInteger(in);
- this.messagesList.add(eventIdSize);
+ messagesList.add(eventIdSize);
eventIdSizeInt = eventIdSize;
// read the total number of events
for (int j = 0; j < eventIdSizeInt; j++) {
- this.messagesList.add(DataSerializer.readObject(in));
+ messagesList.add(DataSerializer.readObject(in));
}
// increment i by adding the total number of ids read and 1 for
// the length of the message
@@ -212,6 +227,6 @@ public class QueueRemovalMessage extends PooledDistributionMessage {
@Override
public String toString() {
- return "QueueRemovalMessage" + this.messagesList;
+ return "QueueRemovalMessage" + messagesList;
}
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/HARegionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/HARegionTest.java
new file mode 100644
index 0000000..0d050ae
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/HARegionTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.apache.geode.internal.statistics.StatisticsClockFactory.disabledClock;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.cache.EvictionAction;
+import org.apache.geode.cache.EvictionAlgorithm;
+import org.apache.geode.cache.EvictionAttributes;
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.internal.cache.ha.HARegionQueue;
+
+public class HARegionTest {
+ private HARegion region;
+
+ private final InternalCache cache = mock(InternalCache.class, RETURNS_DEEP_STUBS);
+ private final RegionAttributes attributes = mock(RegionAttributes.class, RETURNS_DEEP_STUBS);
+ private final EvictionAttributes evictionAttributes =
+ mock(EvictionAttributes.class, RETURNS_DEEP_STUBS);
+
+ @Before
+ public void setup() {
+ when(attributes.getEvictionAttributes()).thenReturn(evictionAttributes);
+ when(attributes.getLoadFactor()).thenReturn(0.75f);
+ when(attributes.getConcurrencyLevel()).thenReturn(16);
+ when(evictionAttributes.getAlgorithm()).thenReturn(EvictionAlgorithm.NONE);
+ when(evictionAttributes.getAction()).thenReturn(EvictionAction.NONE);
+ Set<String> asyncEventQueueIds = Collections.singleton("id");
+ when(attributes.getAsyncEventQueueIds()).thenReturn(asyncEventQueueIds);
+ region = new HARegion("HARegionTest_region", attributes, null, cache, disabledClock());
+ }
+
+ @Test
+ public void getOwnerWithWaitReturnsHARegionQueueIfInitializedWithWait() {
+ long timeout = 1;
+ HARegionQueue queue = mock(HARegionQueue.class);
+ when(queue.isQueueInitializedWithWait(timeout)).thenReturn(true);
+
+ region.setOwner(queue);
+
+ assertThat(region.getOwnerWithWait(timeout)).isEqualTo(queue);
+ }
+
+ @Test
+ public void getOwnerWithWaitReturnsNullIfNotInitializedWithWait() {
+ long timeout = 1;
+ HARegionQueue queue = mock(HARegionQueue.class);
+ when(queue.isQueueInitializedWithWait(timeout)).thenReturn(false);
+
+ region.setOwner(queue);
+
+ assertThat(region.getOwnerWithWait(timeout)).isEqualTo(null);
+ }
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueTest.java
index 67a0a1a..f490675 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueTest.java
@@ -15,8 +15,13 @@
package org.apache.geode.internal.cache.ha;
import static junit.framework.TestCase.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -144,4 +149,38 @@ public class HARegionQueueTest {
assertEquals(3, haRegionQueue.size());
}
+ @Test
+ public void isQueueInitializedWithWaitDoesNotWaitIfInitialized() throws Exception {
+ long time = 1;
+ HARegionQueue spy = spy(haRegionQueue);
+ doReturn(true).when(spy).isQueueInitialized();
+
+ assertThat(spy.isQueueInitializedWithWait(time)).isTrue();
+
+ verify(spy, never()).waitForInitialized(time);
+ }
+
+ @Test
+ public void isQueueInitializedWithWaitWillWaitIfNotInitialized() throws Exception {
+ long time = 1;
+ HARegionQueue spy = spy(haRegionQueue);
+ doReturn(false).doReturn(true).when(spy).isQueueInitialized();
+ doNothing().when(spy).waitForInitialized(time);
+
+ assertThat(spy.isQueueInitializedWithWait(time)).isTrue();
+
+ verify(spy).waitForInitialized(time);
+ }
+
+ @Test
+ public void isQueueInitializedWithWaitReturnsFalseIfNotInitializedAfterWait() throws Exception {
+ long time = 1;
+ HARegionQueue spy = spy(haRegionQueue);
+ doReturn(false).doReturn(false).when(spy).isQueueInitialized();
+ doNothing().when(spy).waitForInitialized(time);
+
+ assertThat(spy.isQueueInitializedWithWait(time)).isFalse();
+
+ verify(spy).waitForInitialized(time);
+ }
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/QueueRemovalMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/QueueRemovalMessageTest.java
new file mode 100644
index 0000000..07f99b7
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/QueueRemovalMessageTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.ha;
+
+import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.EntryNotFoundException;
+import org.apache.geode.cache.RegionDestroyedException;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.HARegion;
+import org.apache.geode.internal.cache.InternalCache;
+
+public class QueueRemovalMessageTest {
+ private QueueRemovalMessage queueRemovalMessage;
+ private List<Object> messagesList;
+
+ private final ClusterDistributionManager dm = mock(ClusterDistributionManager.class);
+ private final InternalCache cache = mock(InternalCache.class);
+ private final String regionName1 = "region1";
+ private final String regionName2 = "region2";
+ private final HARegion region1 = mock(HARegion.class);
+ private final HARegion region2 = mock(HARegion.class);
+ private final HARegionQueue regionQueue1 = mock(HARegionQueue.class);
+ private final HARegionQueue regionQueue2 = mock(HARegionQueue.class);
+ private final EventID eventID1 = mock(EventID.class);
+ private final EventID eventID2 = mock(EventID.class);
+ private final EventID eventID3 = mock(EventID.class);
+ private final int region1EventSize = 1;
+ private final int region2EventSize = 2;
+
+ @Before
+ public void setup() {
+ queueRemovalMessage = spy(new QueueRemovalMessage());
+ messagesList = new LinkedList<>();
+ queueRemovalMessage.setMessagesList(messagesList);
+
+ long maxWaitTimeForInitialization = 30000;
+ when(cache.getRegion(regionName1)).thenReturn(uncheckedCast(region1));
+ when(cache.getRegion(regionName2)).thenReturn(uncheckedCast(region2));
+ when(region1.getOwnerWithWait(maxWaitTimeForInitialization)).thenReturn(regionQueue1);
+ when(region2.getOwnerWithWait(maxWaitTimeForInitialization)).thenReturn(regionQueue2);
+ when(regionQueue1.isQueueInitialized()).thenReturn(true);
+ when(regionQueue2.isQueueInitialized()).thenReturn(true);
+ }
+
+ @Test
+ public void messageProcessInvokesProcessRegionQueues() {
+ when(dm.getCache()).thenReturn(cache);
+
+ queueRemovalMessage.process(dm);
+
+ verify(queueRemovalMessage).processRegionQueues(eq(cache), any(Iterator.class));
+ }
+
+ @Test
+ public void processRegionQueuesCanProcessEachRegionQueue() {
+ addToMessagesList();
+ Iterator iterator = messagesList.iterator();
+
+ queueRemovalMessage.processRegionQueues(cache, iterator);
+
+ verify(queueRemovalMessage).processRegionQueue(iterator, regionName1, region1EventSize,
+ regionQueue1);
+ verify(queueRemovalMessage).processRegionQueue(iterator, regionName2, region2EventSize,
+ regionQueue2);
+ verify(queueRemovalMessage).removeQueueEvent(regionName1, regionQueue1, eventID1);
+ verify(queueRemovalMessage).removeQueueEvent(regionName2, regionQueue2, eventID2);
+ verify(queueRemovalMessage).removeQueueEvent(regionName2, regionQueue2, eventID3);
+ }
+
+ private void addToMessagesList() {
+ messagesList.add(regionName1);
+ messagesList.add(region1EventSize);
+ messagesList.add(eventID1);
+ messagesList.add(regionName2);
+ messagesList.add(region2EventSize);
+ messagesList.add(eventID2);
+ messagesList.add(eventID3);
+ }
+
+ @Test
+ public void canProcessRegionQueuesWithoutHARegionInCache() {
+ addToMessagesList();
+ Iterator iterator = messagesList.iterator();
+ when(cache.getRegion(regionName1)).thenReturn(null);
+
+ queueRemovalMessage.processRegionQueues(cache, iterator);
+
+ verify(queueRemovalMessage).processRegionQueue(iterator, regionName1, region1EventSize, null);
+ verify(queueRemovalMessage).processRegionQueue(iterator, regionName2, region2EventSize,
+ regionQueue2);
+ verify(queueRemovalMessage, never()).removeQueueEvent(regionName1, regionQueue1, eventID1);
+ verify(queueRemovalMessage).removeQueueEvent(regionName2, regionQueue2, eventID2);
+ verify(queueRemovalMessage).removeQueueEvent(regionName2, regionQueue2, eventID3);
+ }
+
+ @Test
+ public void canProcessRegionQueuesWhenHARegionQueueIsNotInitialized() {
+ addToMessagesList();
+ Iterator iterator = messagesList.iterator();
+ when(regionQueue2.isQueueInitialized()).thenReturn(false);
+
+ queueRemovalMessage.processRegionQueues(cache, iterator);
+
+ verify(queueRemovalMessage).processRegionQueue(iterator, regionName1, region1EventSize,
+ regionQueue1);
+ verify(queueRemovalMessage).processRegionQueue(iterator, regionName2, region2EventSize,
+ regionQueue2);
+ verify(queueRemovalMessage).removeQueueEvent(regionName1, regionQueue1, eventID1);
+ verify(queueRemovalMessage, never()).removeQueueEvent(regionName2, regionQueue2, eventID2);
+ verify(queueRemovalMessage, never()).removeQueueEvent(regionName2, regionQueue2, eventID3);
+ }
+
+ @Test
+ public void processRegionQueuesStopsIfProcessRegionQueueReturnsFalse() {
+ addToMessagesList();
+ Iterator iterator = messagesList.iterator();
+ doReturn(false).when(queueRemovalMessage).processRegionQueue(iterator, regionName1,
+ region1EventSize, regionQueue1);
+
+ queueRemovalMessage.processRegionQueues(cache, iterator);
+
+ verify(queueRemovalMessage).processRegionQueue(iterator, regionName1, region1EventSize,
+ regionQueue1);
+ verify(queueRemovalMessage, never()).processRegionQueue(iterator, regionName2, region2EventSize,
+ regionQueue2);
+ }
+
+ @Test
+ public void processRegionQueueReturnsFalseIfRemoveQueueEventReturnsFalse() {
+ messagesList.add(eventID1);
+ Iterator iterator = messagesList.iterator();
+ doReturn(false).when(queueRemovalMessage).removeQueueEvent(regionName1, regionQueue1, eventID1);
+
+ assertThat(queueRemovalMessage.processRegionQueue(iterator, regionName1, region1EventSize,
+ regionQueue1)).isFalse();
+ }
+
+ @Test
+ public void removeQueueEventRemovesEvents() throws Exception {
+ assertThat(queueRemovalMessage.removeQueueEvent(regionName2, regionQueue2, eventID2)).isTrue();
+
+ verify(regionQueue2).removeDispatchedEvents(eventID2);
+ }
+
+ @Test
+ public void removeQueueEventReturnsTrueIfRemovalThrowsCacheException() throws Exception {
+ doThrow(new EntryNotFoundException("")).when(regionQueue2).removeDispatchedEvents(eventID2);
+
+ assertThat(queueRemovalMessage.removeQueueEvent(regionName2, regionQueue2, eventID2)).isTrue();
+ }
+
+ @Test
+ public void removeQueueEventReturnsTrueIfRemovalThrowsRegionDestroyedException()
+ throws Exception {
+ doThrow(new RegionDestroyedException("", "")).when(regionQueue2)
+ .removeDispatchedEvents(eventID2);
+
+ assertThat(queueRemovalMessage.removeQueueEvent(regionName2, regionQueue2, eventID2)).isTrue();
+ }
+
+ @Test
+ public void removeQueueEventReturnsFalseIfRemovalThrowsCancelException() throws Exception {
+ doThrow(new CacheClosedException()).when(regionQueue2).removeDispatchedEvents(eventID2);
+
+ assertThat(queueRemovalMessage.removeQueueEvent(regionName2, regionQueue2, eventID2)).isFalse();
+ }
+
+ @Test
+ public void removeQueueEventReturnsFalseIfRemovalThrowsInterruptedException() throws Exception {
+ doThrow(new InterruptedException()).when(regionQueue2).removeDispatchedEvents(eventID2);
+
+ assertThat(queueRemovalMessage.removeQueueEvent(regionName2, regionQueue2, eventID2)).isFalse();
+ }
+
+ @Test
+ public void removeQueueEventReturnsTrueIfRemovalThrowsRejectedExecutionException()
+ throws Exception {
+ doThrow(new RejectedExecutionException()).when(regionQueue2).removeDispatchedEvents(eventID2);
+
+ assertThat(queueRemovalMessage.removeQueueEvent(regionName2, regionQueue2, eventID2)).isTrue();
+ }
+}