You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2020/07/29 19:26:45 UTC

[geode] branch support/1.13 updated: GEODE-8323: Process QueueRemovalMessage after queue initialized. (#5333)

This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch support/1.13
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.13 by this push:
     new 3c87e10  GEODE-8323: Process QueueRemovalMessage after queue initialized. (#5333)
3c87e10 is described below

commit 3c87e1059c11d52cb2602d76765a4118b86ef8cb
Author: Eric Shu <es...@pivotal.io>
AuthorDate: Tue Jul 7 08:50:57 2020 -0700

    GEODE-8323: Process QueueRemovalMessage after queue initialized. (#5333)
    
    
    (cherry picked from commit 625a204f1a7173323648c05bb0002aded06e2a69)
---
 .../org/apache/geode/internal/cache/HARegion.java  |  15 +-
 .../geode/internal/cache/ha/HARegionQueue.java     |  46 ++++-
 .../internal/cache/ha/QueueRemovalMessage.java     | 169 ++++++++--------
 .../apache/geode/internal/cache/HARegionTest.java  |  77 ++++++++
 .../geode/internal/cache/ha/HARegionQueueTest.java |  39 ++++
 .../internal/cache/ha/QueueRemovalMessageTest.java | 215 +++++++++++++++++++++
 6 files changed, 474 insertions(+), 87 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java
index 1ab0dbd..98f6fb5 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java
@@ -20,6 +20,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.logging.log4j.Logger;
 
@@ -92,7 +93,7 @@ public class HARegion extends DistributedRegion {
 
   private volatile HARegionQueue owningQueue;
 
-  private HARegion(String regionName, RegionAttributes attrs, LocalRegion parentRegion,
+  HARegion(String regionName, RegionAttributes attrs, LocalRegion parentRegion,
       InternalCache cache, StatisticsClock statisticsClock) {
     super(regionName, attrs, parentRegion, cache,
         new InternalRegionArguments().setDestroyLockFlag(true).setRecreateFlag(false)
@@ -274,6 +275,18 @@ public class HARegion extends DistributedRegion {
     return this.owningQueue.isQueueInitialized() ? this.owningQueue : null;
   }
 
+  public HARegionQueue getOwnerWithWait(long timeout) {
+    if (owningQueue.isQueueInitializedWithWait(timeout)) {
+      return owningQueue;
+    } else {
+      if (logger.isDebugEnabled()) {
+        logger.debug("After waiting for {} seconds, queue is still not initialized",
+            TimeUnit.MILLISECONDS.toSeconds(timeout));
+      }
+      return null;
+    }
+  }
+
   @Override
   public CachePerfStats getCachePerfStats() {
     return this.haRegionStats;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
index 5d61284..77e10248e 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
@@ -335,7 +335,6 @@ public class HARegionQueue implements RegionQueue {
       ClientProxyMembershipID clientProxyId, final byte clientConflation, boolean isPrimary,
       StatisticsClock statisticsClock)
       throws IOException, ClassNotFoundException, CacheException, InterruptedException {
-
     String processedRegionName = createRegionName(regionName);
 
     // Initialize the statistics
@@ -374,7 +373,10 @@ public class HARegionQueue implements RegionQueue {
     putGIIDataInRegion();
 
     if (this.getClass() == HARegionQueue.class) {
-      initialized.set(true);
+      synchronized (initialized) {
+        initialized.set(true);
+        initialized.notifyAll();
+      }
     }
   }
 
@@ -407,10 +409,31 @@ public class HARegionQueue implements RegionQueue {
       putGIIDataInRegion();
     }
     if (this.getClass() == HARegionQueue.class) {
-      initialized.set(true);
+      synchronized (initialized) {
+        initialized.set(true);
+        initialized.notifyAll();
+      }
     }
   }
 
+  public boolean isQueueInitializedWithWait(long waitTime) {
+    try {
+      synchronized (initialized) {
+        if (!isQueueInitialized()) {
+          waitForInitialized(waitTime);
+        }
+      }
+    } catch (InterruptedException interruptedException) {
+      Thread.currentThread().interrupt();
+      return false;
+    }
+    return isQueueInitialized();
+  }
+
+  void waitForInitialized(long waitTime) throws InterruptedException {
+    initialized.wait(waitTime);
+  }
+
   /**
    * install DACE information from an initial image provider
    */
@@ -2215,7 +2238,10 @@ public class HARegionQueue implements RegionQueue {
 
       super.putGIIDataInRegion();
       if (this.getClass() == BlockingHARegionQueue.class) {
-        initialized.set(true);
+        synchronized (initialized) {
+          initialized.set(true);
+          initialized.notifyAll();
+        }
       }
     }
 
@@ -2438,12 +2464,14 @@ public class HARegionQueue implements RegionQueue {
         throws IOException, ClassNotFoundException, CacheException, InterruptedException {
       super(regionName, cache, hrqa, haContainer, clientProxyId, clientConflation, isPrimary,
           statisticsClock);
+      threadIdToSeqId.keepPrevAcks = true;
+      durableIDsList = new LinkedHashSet();
+      ackedEvents = new HashMap();
 
-      this.threadIdToSeqId.keepPrevAcks = true;
-      this.durableIDsList = new LinkedHashSet();
-      this.ackedEvents = new HashMap();
-      this.initialized.set(true);
-
+      synchronized (initialized) {
+        initialized.set(true);
+        initialized.notifyAll();
+      }
     }
 
     @Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/QueueRemovalMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/QueueRemovalMessage.java
index 297aa8c..f789411 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/QueueRemovalMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/QueueRemovalMessage.java
@@ -14,7 +14,7 @@
  */
 package org.apache.geode.internal.cache.ha;
 
-import static org.apache.geode.internal.cache.LocalRegion.InitializationLevel.BEFORE_INITIAL_IMAGE;
+import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -36,7 +36,6 @@ import org.apache.geode.internal.cache.EventID;
 import org.apache.geode.internal.cache.HARegion;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.LocalRegion;
-import org.apache.geode.internal.cache.LocalRegion.InitializationLevel;
 import org.apache.geode.internal.serialization.DeserializationContext;
 import org.apache.geode.internal.serialization.SerializationContext;
 import org.apache.geode.logging.internal.log4j.api.LogService;
@@ -52,20 +51,20 @@ public class QueueRemovalMessage extends PooledDistributionMessage {
   /**
    * List of messages (String[] )
    */
-  private List messagesList;
+  private List<Object> messagesList;
 
   /**
    * Constructor : Set the recipient list to ALL_RECIPIENTS
    */
   public QueueRemovalMessage() {
-    this.setRecipient(ALL_RECIPIENTS);
+    setRecipient(ALL_RECIPIENTS);
   }
 
   /**
    * Set the message list
    */
-  public void setMessagesList(List messages) {
-    this.messagesList = messages;
+  void setMessagesList(List messages) {
+    this.messagesList = uncheckedCast(messages);
   }
 
   /**
@@ -76,72 +75,88 @@ public class QueueRemovalMessage extends PooledDistributionMessage {
   protected void process(ClusterDistributionManager dm) {
     final InternalCache cache = dm.getCache();
     if (cache != null) {
-      Iterator iterator = this.messagesList.iterator();
-      final InitializationLevel oldLevel =
-          LocalRegion.setThreadInitLevelRequirement(BEFORE_INITIAL_IMAGE);
-      try {
-        while (iterator.hasNext()) {
-          final String regionName = (String) iterator.next();
-          final int size = (Integer) iterator.next();
-          final LocalRegion region = (LocalRegion) cache.getRegion(regionName);
-          final HARegionQueue hrq;
-          if (region == null || !region.isInitialized()) {
-            hrq = null;
-          } else {
-            HARegionQueue tmp = ((HARegion) region).getOwner();
-            if (tmp != null && tmp.isQueueInitialized()) {
-              hrq = tmp;
-            } else {
-              hrq = null;
-            }
-          }
-          // we have to iterate even if the hrq isn't available since there are
-          // a bunch of event IDs to go through
-          for (int i = 0; i < size; i++) {
-            final EventID id = (EventID) iterator.next();
-            boolean interrupted = Thread.interrupted();
-            if (hrq == null || !hrq.isQueueInitialized()) {
-              continue;
-            }
-            try {
-              // Fix for bug 39516: inline removal of events by QRM.
-              // dm.getWaitingThreadPool().execute(new Runnable() {
-              // public void run()
-              // {
-              try {
-                if (logger.isTraceEnabled()) {
-                  logger.trace("QueueRemovalMessage: removing dispatched events on queue {} for {}",
-                      regionName, id);
-                }
-                hrq.removeDispatchedEvents(id);
-              } catch (RegionDestroyedException ignore) {
-                logger.info(
-                    "Queue found destroyed while processing the last disptached sequence ID for a HARegionQueue's DACE. The event ID is {} for HARegion with name={}",
-                    new Object[] {id, regionName});
-              } catch (CancelException ignore) {
-                return; // cache or DS is closing
-              } catch (CacheException e) {
-                logger.error(String.format(
-                    "QueueRemovalMessage::process:Exception in processing the last disptached sequence ID for a HARegionQueue's DACE. The problem is with event ID,%s for HARegion with name=%s",
-                    new Object[] {regionName, id}),
-                    e);
-              } catch (InterruptedException ignore) {
-                return; // interrupt occurs during shutdown. this runs in an executor, so just stop
-                        // processing
-              }
-            } catch (RejectedExecutionException ignore) {
-              interrupted = true;
-            } finally {
-              if (interrupted) {
-                Thread.currentThread().interrupt();
-              }
-            }
-          } // if
-        } // for
-      } finally {
-        LocalRegion.setThreadInitLevelRequirement(oldLevel);
+      Iterator iterator = messagesList.iterator();
+      processRegionQueues(cache, iterator);
+    }
+  }
+
+  void processRegionQueues(InternalCache cache, Iterator iterator) {
+    while (iterator.hasNext()) {
+      final String regionName = (String) iterator.next();
+      final int size = (Integer) iterator.next();
+      final LocalRegion region = (LocalRegion) cache.getRegion(regionName);
+      final HARegionQueue hrq;
+      if (region == null) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("processing QRM region {} does not exist.", regionName);
+        }
+        hrq = null;
+      } else {
+        long maxWaitTimeForInitialization = 30000;
+        hrq = ((HARegion) region).getOwnerWithWait(maxWaitTimeForInitialization);
+      }
+      boolean succeed = processRegionQueue(iterator, regionName, size, hrq);
+      if (!succeed) {
+        return;
+      }
+    }
+  }
+
+  boolean processRegionQueue(Iterator iterator, String regionName, int size,
+      HARegionQueue hrq) {
+    // we have to iterate even if the hrq isn't available since there are
+    // a bunch of event IDs to go through
+    for (int i = 0; i < size; i++) {
+      final EventID id = (EventID) iterator.next();
+      if (hrq == null || !hrq.isQueueInitialized()) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("QueueRemovalMessage: hrq is not ready when trying to remove "
+              + "dispatched event on queue {} for {}", regionName, id);
+        }
+        continue;
+      }
+
+      if (!removeQueueEvent(regionName, hrq, id)) {
+        return false;
       }
-    } // cache != null
+    }
+    return true;
+  }
+
+  boolean removeQueueEvent(String regionName, HARegionQueue hrq, EventID id) {
+    // Fix for bug 39516: inline removal of events by QRM.
+    // dm.getWaitingThreadPool().execute(new Runnable() {
+    // public void run()
+    // {
+    boolean interrupted = Thread.interrupted();
+    try {
+      if (logger.isTraceEnabled()) {
+        logger.trace("QueueRemovalMessage: removing dispatched events on queue {} for {}",
+            regionName, id);
+      }
+      hrq.removeDispatchedEvents(id);
+    } catch (RegionDestroyedException ignore) {
+      logger.info(
+          "Queue found destroyed while processing the last dispatched sequence ID for a HARegionQueue's DACE. The event ID is {} for HARegion with name={}",
+          new Object[] {id, regionName});
+    } catch (CancelException ignore) {
+      return false;
+    } catch (CacheException e) {
+      logger.error(String.format(
+          "QueueRemovalMessage::process:Exception in processing the last dispatched sequence ID for a HARegionQueue's DACE. The problem is with event ID, %s for HARegion with name=%s",
+          regionName, id),
+          e);
+    } catch (InterruptedException ignore) {
+      Thread.currentThread().interrupt();
+      return false;
+    } catch (RejectedExecutionException ignore) {
+      interrupted = true;
+    } finally {
+      if (interrupted) {
+        Thread.currentThread().interrupt();
+      }
+    }
+    return true;
   }
 
   @Override
@@ -190,18 +205,18 @@ public class QueueRemovalMessage extends PooledDistributionMessage {
     super.fromData(in, context);
     // read the size of the message
     int size = DataSerializer.readInteger(in);
-    this.messagesList = new LinkedList();
+    messagesList = new LinkedList<>();
     int eventIdSizeInt;
     for (int i = 0; i < size; i++) {
       // read the region name
-      this.messagesList.add(DataSerializer.readString(in));
-      // read the datasize
+      messagesList.add(DataSerializer.readString(in));
+      // read the data size
       Integer eventIdSize = DataSerializer.readInteger(in);
-      this.messagesList.add(eventIdSize);
+      messagesList.add(eventIdSize);
       eventIdSizeInt = eventIdSize;
       // read the total number of events
       for (int j = 0; j < eventIdSizeInt; j++) {
-        this.messagesList.add(DataSerializer.readObject(in));
+        messagesList.add(DataSerializer.readObject(in));
       }
       // increment i by adding the total number of ids read and 1 for
       // the length of the message
@@ -212,6 +227,6 @@ public class QueueRemovalMessage extends PooledDistributionMessage {
 
   @Override
   public String toString() {
-    return "QueueRemovalMessage" + this.messagesList;
+    return "QueueRemovalMessage" + messagesList;
   }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/HARegionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/HARegionTest.java
new file mode 100644
index 0000000..0d050ae
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/HARegionTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.apache.geode.internal.statistics.StatisticsClockFactory.disabledClock;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.cache.EvictionAction;
+import org.apache.geode.cache.EvictionAlgorithm;
+import org.apache.geode.cache.EvictionAttributes;
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.internal.cache.ha.HARegionQueue;
+
+public class HARegionTest {
+  private HARegion region;
+
+  private final InternalCache cache = mock(InternalCache.class, RETURNS_DEEP_STUBS);
+  private final RegionAttributes attributes = mock(RegionAttributes.class, RETURNS_DEEP_STUBS);
+  private final EvictionAttributes evictionAttributes =
+      mock(EvictionAttributes.class, RETURNS_DEEP_STUBS);
+
+  @Before
+  public void setup() {
+    when(attributes.getEvictionAttributes()).thenReturn(evictionAttributes);
+    when(attributes.getLoadFactor()).thenReturn(0.75f);
+    when(attributes.getConcurrencyLevel()).thenReturn(16);
+    when(evictionAttributes.getAlgorithm()).thenReturn(EvictionAlgorithm.NONE);
+    when(evictionAttributes.getAction()).thenReturn(EvictionAction.NONE);
+    Set<String> asyncEventQueueIds = Collections.singleton("id");
+    when(attributes.getAsyncEventQueueIds()).thenReturn(asyncEventQueueIds);
+    region = new HARegion("HARegionTest_region", attributes, null, cache, disabledClock());
+  }
+
+  @Test
+  public void getOwnerWithWaitReturnsHARegionQueueIfInitializedWithWait() {
+    long timeout = 1;
+    HARegionQueue queue = mock(HARegionQueue.class);
+    when(queue.isQueueInitializedWithWait(timeout)).thenReturn(true);
+
+    region.setOwner(queue);
+
+    assertThat(region.getOwnerWithWait(timeout)).isEqualTo(queue);
+  }
+
+  @Test
+  public void getOwnerWithWaitReturnsNullIfNotInitializedWithWait() {
+    long timeout = 1;
+    HARegionQueue queue = mock(HARegionQueue.class);
+    when(queue.isQueueInitializedWithWait(timeout)).thenReturn(false);
+
+    region.setOwner(queue);
+
+    assertThat(region.getOwnerWithWait(timeout)).isEqualTo(null);
+  }
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueTest.java
index 67a0a1a..f490675 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueTest.java
@@ -15,8 +15,13 @@
 package org.apache.geode.internal.cache.ha;
 
 import static junit.framework.TestCase.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -144,4 +149,38 @@ public class HARegionQueueTest {
     assertEquals(3, haRegionQueue.size());
   }
 
+  @Test
+  public void isQueueInitializedWithWaitDoesNotWaitIfInitialized() throws Exception {
+    long time = 1;
+    HARegionQueue spy = spy(haRegionQueue);
+    doReturn(true).when(spy).isQueueInitialized();
+
+    assertThat(spy.isQueueInitializedWithWait(time)).isTrue();
+
+    verify(spy, never()).waitForInitialized(time);
+  }
+
+  @Test
+  public void isQueueInitializedWithWaitWillWaitIfNotInitialized() throws Exception {
+    long time = 1;
+    HARegionQueue spy = spy(haRegionQueue);
+    doReturn(false).doReturn(true).when(spy).isQueueInitialized();
+    doNothing().when(spy).waitForInitialized(time);
+
+    assertThat(spy.isQueueInitializedWithWait(time)).isTrue();
+
+    verify(spy).waitForInitialized(time);
+  }
+
+  @Test
+  public void isQueueInitializedWithWaitReturnsFalseIfNotInitializedAfterWait() throws Exception {
+    long time = 1;
+    HARegionQueue spy = spy(haRegionQueue);
+    doReturn(false).doReturn(false).when(spy).isQueueInitialized();
+    doNothing().when(spy).waitForInitialized(time);
+
+    assertThat(spy.isQueueInitializedWithWait(time)).isFalse();
+
+    verify(spy).waitForInitialized(time);
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/QueueRemovalMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/QueueRemovalMessageTest.java
new file mode 100644
index 0000000..07f99b7
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/QueueRemovalMessageTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.ha;
+
+import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.EntryNotFoundException;
+import org.apache.geode.cache.RegionDestroyedException;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.HARegion;
+import org.apache.geode.internal.cache.InternalCache;
+
+public class QueueRemovalMessageTest {
+  private QueueRemovalMessage queueRemovalMessage;
+  private List<Object> messagesList;
+
+  private final ClusterDistributionManager dm = mock(ClusterDistributionManager.class);
+  private final InternalCache cache = mock(InternalCache.class);
+  private final String regionName1 = "region1";
+  private final String regionName2 = "region2";
+  private final HARegion region1 = mock(HARegion.class);
+  private final HARegion region2 = mock(HARegion.class);
+  private final HARegionQueue regionQueue1 = mock(HARegionQueue.class);
+  private final HARegionQueue regionQueue2 = mock(HARegionQueue.class);
+  private final EventID eventID1 = mock(EventID.class);
+  private final EventID eventID2 = mock(EventID.class);
+  private final EventID eventID3 = mock(EventID.class);
+  private final int region1EventSize = 1;
+  private final int region2EventSize = 2;
+
+  @Before
+  public void setup() {
+    queueRemovalMessage = spy(new QueueRemovalMessage());
+    messagesList = new LinkedList<>();
+    queueRemovalMessage.setMessagesList(messagesList);
+
+    long maxWaitTimeForInitialization = 30000;
+    when(cache.getRegion(regionName1)).thenReturn(uncheckedCast(region1));
+    when(cache.getRegion(regionName2)).thenReturn(uncheckedCast(region2));
+    when(region1.getOwnerWithWait(maxWaitTimeForInitialization)).thenReturn(regionQueue1);
+    when(region2.getOwnerWithWait(maxWaitTimeForInitialization)).thenReturn(regionQueue2);
+    when(regionQueue1.isQueueInitialized()).thenReturn(true);
+    when(regionQueue2.isQueueInitialized()).thenReturn(true);
+  }
+
+  @Test
+  public void messageProcessInvokesProcessRegionQueues() {
+    when(dm.getCache()).thenReturn(cache);
+
+    queueRemovalMessage.process(dm);
+
+    verify(queueRemovalMessage).processRegionQueues(eq(cache), any(Iterator.class));
+  }
+
+  @Test
+  public void processRegionQueuesCanProcessEachRegionQueue() {
+    addToMessagesList();
+    Iterator iterator = messagesList.iterator();
+
+    queueRemovalMessage.processRegionQueues(cache, iterator);
+
+    verify(queueRemovalMessage).processRegionQueue(iterator, regionName1, region1EventSize,
+        regionQueue1);
+    verify(queueRemovalMessage).processRegionQueue(iterator, regionName2, region2EventSize,
+        regionQueue2);
+    verify(queueRemovalMessage).removeQueueEvent(regionName1, regionQueue1, eventID1);
+    verify(queueRemovalMessage).removeQueueEvent(regionName2, regionQueue2, eventID2);
+    verify(queueRemovalMessage).removeQueueEvent(regionName2, regionQueue2, eventID3);
+  }
+
+  private void addToMessagesList() {
+    messagesList.add(regionName1);
+    messagesList.add(region1EventSize);
+    messagesList.add(eventID1);
+    messagesList.add(regionName2);
+    messagesList.add(region2EventSize);
+    messagesList.add(eventID2);
+    messagesList.add(eventID3);
+  }
+
+  @Test
+  public void canProcessRegionQueuesWithoutHARegionInCache() {
+    addToMessagesList();
+    Iterator iterator = messagesList.iterator();
+    when(cache.getRegion(regionName1)).thenReturn(null);
+
+    queueRemovalMessage.processRegionQueues(cache, iterator);
+
+    verify(queueRemovalMessage).processRegionQueue(iterator, regionName1, region1EventSize, null);
+    verify(queueRemovalMessage).processRegionQueue(iterator, regionName2, region2EventSize,
+        regionQueue2);
+    verify(queueRemovalMessage, never()).removeQueueEvent(regionName1, regionQueue1, eventID1);
+    verify(queueRemovalMessage).removeQueueEvent(regionName2, regionQueue2, eventID2);
+    verify(queueRemovalMessage).removeQueueEvent(regionName2, regionQueue2, eventID3);
+  }
+
+  @Test
+  public void canProcessRegionQueuesWhenHARegionQueueIsNotInitialized() {
+    addToMessagesList();
+    Iterator iterator = messagesList.iterator();
+    when(regionQueue2.isQueueInitialized()).thenReturn(false);
+
+    queueRemovalMessage.processRegionQueues(cache, iterator);
+
+    verify(queueRemovalMessage).processRegionQueue(iterator, regionName1, region1EventSize,
+        regionQueue1);
+    verify(queueRemovalMessage).processRegionQueue(iterator, regionName2, region2EventSize,
+        regionQueue2);
+    verify(queueRemovalMessage).removeQueueEvent(regionName1, regionQueue1, eventID1);
+    verify(queueRemovalMessage, never()).removeQueueEvent(regionName2, regionQueue2, eventID2);
+    verify(queueRemovalMessage, never()).removeQueueEvent(regionName2, regionQueue2, eventID3);
+  }
+
+  @Test
+  public void processRegionQueuesStopsIfProcessRegionQueueReturnsFalse() {
+    addToMessagesList();
+    Iterator iterator = messagesList.iterator();
+    doReturn(false).when(queueRemovalMessage).processRegionQueue(iterator, regionName1,
+        region1EventSize, regionQueue1);
+
+    queueRemovalMessage.processRegionQueues(cache, iterator);
+
+    verify(queueRemovalMessage).processRegionQueue(iterator, regionName1, region1EventSize,
+        regionQueue1);
+    verify(queueRemovalMessage, never()).processRegionQueue(iterator, regionName2, region2EventSize,
+        regionQueue2);
+  }
+
+  @Test
+  public void processRegionQueueReturnsFalseIfRemoveQueueEventReturnsFalse() {
+    messagesList.add(eventID1);
+    Iterator iterator = messagesList.iterator();
+    doReturn(false).when(queueRemovalMessage).removeQueueEvent(regionName1, regionQueue1, eventID1);
+
+    assertThat(queueRemovalMessage.processRegionQueue(iterator, regionName1, region1EventSize,
+        regionQueue1)).isFalse();
+  }
+
+  @Test
+  public void removeQueueEventRemovesEvents() throws Exception {
+    assertThat(queueRemovalMessage.removeQueueEvent(regionName2, regionQueue2, eventID2)).isTrue();
+
+    verify(regionQueue2).removeDispatchedEvents(eventID2);
+  }
+
+  @Test
+  public void removeQueueEventReturnsTrueIfRemovalThrowsCacheException() throws Exception {
+    doThrow(new EntryNotFoundException("")).when(regionQueue2).removeDispatchedEvents(eventID2);
+
+    assertThat(queueRemovalMessage.removeQueueEvent(regionName2, regionQueue2, eventID2)).isTrue();
+  }
+
+  @Test
+  public void removeQueueEventReturnsTrueIfRemovalThrowsRegionDestroyedException()
+      throws Exception {
+    doThrow(new RegionDestroyedException("", "")).when(regionQueue2)
+        .removeDispatchedEvents(eventID2);
+
+    assertThat(queueRemovalMessage.removeQueueEvent(regionName2, regionQueue2, eventID2)).isTrue();
+  }
+
+  @Test
+  public void removeQueueEventReturnsFalseIfRemovalThrowsCancelException() throws Exception {
+    doThrow(new CacheClosedException()).when(regionQueue2).removeDispatchedEvents(eventID2);
+
+    assertThat(queueRemovalMessage.removeQueueEvent(regionName2, regionQueue2, eventID2)).isFalse();
+  }
+
+  @Test
+  public void removeQueueEventReturnsFalseIfRemovalThrowsInterruptedException() throws Exception {
+    doThrow(new InterruptedException()).when(regionQueue2).removeDispatchedEvents(eventID2);
+
+    assertThat(queueRemovalMessage.removeQueueEvent(regionName2, regionQueue2, eventID2)).isFalse();
+  }
+
+  @Test
+  public void removeQueueEventReturnsTrueIfRemovalThrowsRejectedExecutionException()
+      throws Exception {
+    doThrow(new RejectedExecutionException()).when(regionQueue2).removeDispatchedEvents(eventID2);
+
+    assertThat(queueRemovalMessage.removeQueueEvent(regionName2, regionQueue2, eventID2)).isTrue();
+  }
+}