You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by jc...@apache.org on 2021/03/11 20:20:30 UTC

[geode] branch support/1.14 updated: GEODE-9016: Fix the NPE for PutAll with CQ LOCAL_DESTROY message type (#6104)

This is an automated email from the ASF dual-hosted git repository.

jchen21 pushed a commit to branch support/1.14
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.14 by this push:
     new 6cd319c  GEODE-9016: Fix the NPE for PutAll with CQ LOCAL_DESTROY message type (#6104)
6cd319c is described below

commit 6cd319cf58e9cbc5dba8be35fcd0d786929f2413
Author: Jianxia Chen <11...@users.noreply.github.com>
AuthorDate: Thu Mar 11 11:19:39 2021 -0800

    GEODE-9016: Fix the NPE for PutAll with CQ LOCAL_DESTROY message type (#6104)
    
    For PutAll and RemoveAll, when removing destroy token from CQ result keys, use the keys in the individual entry events, instead of using the key in the base event.
    
    (cherry picked from commit 4e0c8aa6937ad2b5935a11994138381fa29a8644)
---
 .../internal/cache/DistributedCacheOperation.java  |  25 ++--
 .../internal/cache/DistributedPutAllOperation.java |  20 +++
 .../cache/DistributedRemoveAllOperation.java       |  20 +++
 .../cache/DistributedCacheOperationTest.java       |  24 ++++
 .../cache/DistributedPutAllOperationTest.java      |  51 ++++++++
 .../cache/DistributedRemoveAllOperationTest.java   |  51 ++++++++
 .../dunit/PartitionedRegionCqQueryDUnitTest.java   | 136 +++++++++++++++++++++
 7 files changed, 317 insertions(+), 10 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
index 3e55045..b2f2767 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
@@ -719,7 +719,7 @@ public abstract class DistributedCacheOperation {
   private void removeDestroyTokensFromCqResultKeys(FilterRoutingInfo filterRouting) {
     for (InternalDistributedMember m : filterRouting.getMembers()) {
       FilterInfo filterInfo = filterRouting.getFilterInfo(m);
-      if (filterInfo.getCQs() == null) {
+      if (filterInfo == null || filterInfo.getCQs() == null) {
         continue;
       }
 
@@ -734,15 +734,20 @@ public abstract class DistributedCacheOperation {
       for (Object value : cf.filterProfile.getCqMap().values()) {
         ServerCQ cq = (ServerCQ) value;
 
-        for (Map.Entry<Long, Integer> e : filterInfo.getCQs().entrySet()) {
-          Long cqID = e.getKey();
-          // For the CQs satisfying the event with destroy CQEvent, remove
-          // the entry form CQ cache.
-          if (cq != null && cq.getFilterID() != null && cq.getFilterID().equals(cqID)
-              && (e.getValue().equals(MessageType.LOCAL_DESTROY))) {
-            cq.removeFromCqResultKeys(((EntryOperation) event).getKey(), true);
-          }
-        }
+        doRemoveDestroyTokensFromCqResultKeys(filterInfo, cq);
+      }
+    }
+  }
+
+  void doRemoveDestroyTokensFromCqResultKeys(FilterInfo filterInfo, ServerCQ cq) {
+    for (Map.Entry<Long, Integer> e : filterInfo.getCQs().entrySet()) {
+      Long cqID = e.getKey();
+      // For the CQs satisfying the event with destroy CQEvent, remove
+      // the entry form CQ cache.
+      if (cq != null && cq.getFilterID() != null && cq.getFilterID().equals(cqID)
+          && e.getValue() != null && e.getValue().equals(MessageType.LOCAL_DESTROY)
+          && ((EntryOperation) event).getKey() != null) {
+        cq.removeFromCqResultKeys(((EntryOperation) event).getKey(), true);
       }
     }
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java
index 9a9737e..842d7cd 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java
@@ -25,6 +25,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
@@ -39,6 +40,7 @@ import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.cache.Scope;
 import org.apache.geode.cache.persistence.PersistentReplicatesOfflineException;
 import org.apache.geode.cache.query.internal.cq.CqService;
+import org.apache.geode.cache.query.internal.cq.ServerCQ;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.DirectReplyProcessor;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
@@ -46,6 +48,7 @@ import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo;
 import org.apache.geode.internal.cache.ha.ThreadIdentifier;
 import org.apache.geode.internal.cache.partitioned.PutAllPRMessage;
+import org.apache.geode.internal.cache.tier.MessageType;
 import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
 import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
 import org.apache.geode.internal.cache.tx.RemotePutAllMessage;
@@ -814,6 +817,23 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation {
     return consolidated;
   }
 
+  @Override
+  void doRemoveDestroyTokensFromCqResultKeys(FilterInfo filterInfo, ServerCQ cq) {
+    for (Map.Entry<Long, Integer> e : filterInfo.getCQs().entrySet()) {
+      Long cqID = e.getKey();
+      // For the CQs satisfying the event with destroy CQEvent, remove
+      // the entry from CQ cache.
+      for (int i = 0; i < this.putAllData.length; i++) {
+        @Unretained
+        EntryEventImpl entryEvent = getEventForPosition(i);
+        if (entryEvent != null && entryEvent.getKey() != null && cq != null
+            && cq.getFilterID() != null && cq.getFilterID().equals(cqID)
+            && e.getValue() != null && e.getValue().equals(MessageType.LOCAL_DESTROY)) {
+          cq.removeFromCqResultKeys(entryEvent.getKey(), true);
+        }
+      }
+    }
+  }
 
   @Override
   protected FilterInfo getLocalFilterRouting(FilterRoutingInfo frInfo) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java
index f76c701..942a4dd 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.logging.log4j.Logger;
@@ -33,6 +34,7 @@ import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.cache.Scope;
 import org.apache.geode.cache.persistence.PersistentReplicatesOfflineException;
 import org.apache.geode.cache.query.internal.cq.CqService;
+import org.apache.geode.cache.query.internal.cq.ServerCQ;
 import org.apache.geode.distributed.internal.DirectReplyProcessor;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.InternalDataSerializer;
@@ -40,6 +42,7 @@ import org.apache.geode.internal.cache.DistributedPutAllOperation.EntryVersionsL
 import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo;
 import org.apache.geode.internal.cache.ha.ThreadIdentifier;
 import org.apache.geode.internal.cache.partitioned.RemoveAllPRMessage;
+import org.apache.geode.internal.cache.tier.MessageType;
 import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
 import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
 import org.apache.geode.internal.cache.tx.RemoteRemoveAllMessage;
@@ -584,6 +587,23 @@ public class DistributedRemoveAllOperation extends AbstractUpdateOperation {
     return consolidated;
   }
 
+  @Override
+  void doRemoveDestroyTokensFromCqResultKeys(FilterInfo filterInfo, ServerCQ cq) {
+    for (Map.Entry<Long, Integer> e : filterInfo.getCQs().entrySet()) {
+      Long cqID = e.getKey();
+      // For the CQs satisfying the event with destroy CQEvent, remove
+      // the entry from CQ cache.
+      for (int i = 0; i < this.removeAllData.length; i++) {
+        @Unretained
+        EntryEventImpl entryEvent = getEventForPosition(i);
+        if (entryEvent != null && entryEvent.getKey() != null && cq != null
+            && cq.getFilterID() != null && cq.getFilterID().equals(cqID)
+            && e.getValue() != null && e.getValue().equals(MessageType.LOCAL_DESTROY)) {
+          cq.removeFromCqResultKeys(entryEvent.getKey(), true);
+        }
+      }
+    }
+  }
 
   @Override
   protected FilterInfo getLocalFilterRouting(FilterRoutingInfo frInfo) {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java
index a97fecc..606fac6 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java
@@ -17,6 +17,8 @@ package org.apache.geode.internal.cache;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -28,9 +30,11 @@ import java.util.Map;
 import org.junit.Test;
 
 import org.apache.geode.cache.CacheEvent;
+import org.apache.geode.cache.query.internal.cq.ServerCQ;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.DistributedCacheOperation.CacheOperationMessage;
 import org.apache.geode.internal.cache.persistence.PersistentMemberID;
+import org.apache.geode.internal.cache.tier.MessageType;
 
 public class DistributedCacheOperationTest {
 
@@ -106,4 +110,24 @@ public class DistributedCacheOperationTest {
       throw new RuntimeException("boom");
     }
   }
+
+  @Test
+  public void testDoRemoveDestroyTokensFromCqResultKeys() {
+    Object key = new Object();
+    HashMap hashMap = new HashMap();
+    hashMap.put(1L, MessageType.LOCAL_DESTROY);
+    EntryEventImpl baseEvent = mock(EntryEventImpl.class);
+    ServerCQ serverCQ = mock(ServerCQ.class);
+    FilterRoutingInfo.FilterInfo filterInfo = mock(FilterRoutingInfo.FilterInfo.class);
+    DistributedCacheOperation distributedCacheOperation =
+        new DestroyOperation(baseEvent);
+    when(baseEvent.getKey()).thenReturn(key);
+    when(filterInfo.getCQs()).thenReturn(hashMap);
+    when(serverCQ.getFilterID()).thenReturn(new Long(1L));
+    doNothing().when(serverCQ).removeFromCqResultKeys(isA(Object.class), isA(Boolean.class));
+
+    distributedCacheOperation.doRemoveDestroyTokensFromCqResultKeys(filterInfo, serverCQ);
+
+    verify(serverCQ, times(1)).removeFromCqResultKeys(key, true);
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedPutAllOperationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedPutAllOperationTest.java
index a06920e..da0c1cf 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedPutAllOperationTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedPutAllOperationTest.java
@@ -15,11 +15,25 @@
 package org.apache.geode.internal.cache;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.HashMap;
+
 import org.junit.Test;
 
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.query.internal.cq.CqService;
+import org.apache.geode.cache.query.internal.cq.ServerCQ;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.cache.tier.MessageType;
+
 
 public class DistributedPutAllOperationTest {
 
@@ -33,4 +47,41 @@ public class DistributedPutAllOperationTest {
 
     assertThat(mockDistributedPutAllOperation.getBaseEvent()).isSameAs(mockEntryEventImpl);
   }
+
+  @Test
+  public void testDoRemoveDestroyTokensFromCqResultKeys() {
+    EntryEventImpl baseEvent = mock(EntryEventImpl.class);
+    EntryEventImpl entryEvent = mock(EntryEventImpl.class);
+    BucketRegion bucketRegion = mock(BucketRegion.class);
+    InternalCache internalCache = mock(InternalCache.class);
+    RegionAttributes regionAttributes = mock(RegionAttributes.class);
+    FilterRoutingInfo.FilterInfo filterInfo = mock(FilterRoutingInfo.FilterInfo.class);
+    CqService cqService = mock(CqService.class);
+    InternalDistributedSystem internalDistributedSystem = mock(InternalDistributedSystem.class);
+    PartitionedRegion partitionedRegion = mock(PartitionedRegion.class);
+    ServerCQ serverCQ = mock(ServerCQ.class);
+    int putAllPRDataSize = 1;
+    DistributedPutAllOperation distributedPutAllOperation =
+        new DistributedPutAllOperation(baseEvent, putAllPRDataSize, false);
+    Object key = new Object();
+    when(entryEvent.getKey()).thenReturn(key);
+    distributedPutAllOperation.addEntry(entryEvent);
+    HashMap hashMap = new HashMap();
+    hashMap.put(1L, MessageType.LOCAL_DESTROY);
+    when(filterInfo.getCQs()).thenReturn(hashMap);
+    when(baseEvent.getRegion()).thenReturn(bucketRegion);
+    when(bucketRegion.getAttributes()).thenReturn(regionAttributes);
+    when(regionAttributes.getDataPolicy()).thenReturn(DataPolicy.DEFAULT);
+    when(bucketRegion.getCache()).thenReturn(internalCache);
+    when(bucketRegion.getPartitionedRegion()).thenReturn(partitionedRegion);
+    when(bucketRegion.getKeyInfo(any(), any(), any())).thenReturn(new KeyInfo(key, null, null));
+    when(internalCache.getDistributedSystem()).thenReturn(internalDistributedSystem);
+    when(internalCache.getCqService()).thenReturn(cqService);
+    when(serverCQ.getFilterID()).thenReturn(new Long(1L));
+    doNothing().when(serverCQ).removeFromCqResultKeys(isA(Object.class), isA(Boolean.class));
+
+    distributedPutAllOperation.doRemoveDestroyTokensFromCqResultKeys(filterInfo, serverCQ);
+
+    verify(serverCQ, times(1)).removeFromCqResultKeys(key, true);
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRemoveAllOperationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRemoveAllOperationTest.java
index bd84116..7f2eb46 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRemoveAllOperationTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRemoveAllOperationTest.java
@@ -15,11 +15,25 @@
 package org.apache.geode.internal.cache;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.HashMap;
+
 import org.junit.Test;
 
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.query.internal.cq.CqService;
+import org.apache.geode.cache.query.internal.cq.ServerCQ;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.cache.tier.MessageType;
+
 
 public class DistributedRemoveAllOperationTest {
 
@@ -33,4 +47,41 @@ public class DistributedRemoveAllOperationTest {
 
     assertThat(mockDistributedRemoveAllOperation.getBaseEvent()).isSameAs(mockEntryEventImpl);
   }
+
+  @Test
+  public void testDoRemoveDestroyTokensFromCqResultKeys() {
+    EntryEventImpl baseEvent = mock(EntryEventImpl.class);
+    EntryEventImpl entryEvent = mock(EntryEventImpl.class);
+    BucketRegion bucketRegion = mock(BucketRegion.class);
+    InternalCache internalCache = mock(InternalCache.class);
+    RegionAttributes regionAttributes = mock(RegionAttributes.class);
+    InternalDistributedSystem internalDistributedSystem = mock(InternalDistributedSystem.class);
+    FilterRoutingInfo.FilterInfo filterInfo = mock(FilterRoutingInfo.FilterInfo.class);
+    CqService cqService = mock(CqService.class);
+    PartitionedRegion partitionedRegion = mock(PartitionedRegion.class);
+    ServerCQ serverCQ = mock(ServerCQ.class);
+    int removeAllPRDataSize = 1;
+    DistributedRemoveAllOperation distributedRemoveAllOperation =
+        new DistributedRemoveAllOperation(baseEvent, removeAllPRDataSize, false);
+    Object key = new Object();
+    when(entryEvent.getKey()).thenReturn(key);
+    distributedRemoveAllOperation.addEntry(entryEvent);
+    HashMap hashMap = new HashMap();
+    hashMap.put(1L, MessageType.LOCAL_DESTROY);
+    when(filterInfo.getCQs()).thenReturn(hashMap);
+    when(baseEvent.getRegion()).thenReturn(bucketRegion);
+    when(bucketRegion.getAttributes()).thenReturn(regionAttributes);
+    when(bucketRegion.getPartitionedRegion()).thenReturn(partitionedRegion);
+    when(bucketRegion.getCache()).thenReturn(internalCache);
+    when(bucketRegion.getKeyInfo(any(), any(), any())).thenReturn(new KeyInfo(key, null, null));
+    when(regionAttributes.getDataPolicy()).thenReturn(DataPolicy.DEFAULT);
+    when(internalCache.getDistributedSystem()).thenReturn(internalDistributedSystem);
+    when(internalCache.getCqService()).thenReturn(cqService);
+    when(serverCQ.getFilterID()).thenReturn(new Long(1L));
+    doNothing().when(serverCQ).removeFromCqResultKeys(isA(Object.class), isA(Boolean.class));
+
+    distributedRemoveAllOperation.doRemoveDestroyTokensFromCqResultKeys(filterInfo, serverCQ);
+
+    verify(serverCQ, times(1)).removeFromCqResultKeys(key, true);
+  }
 }
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionCqQueryDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionCqQueryDUnitTest.java
index 3f520f6..6d2e979 100644
--- a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionCqQueryDUnitTest.java
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionCqQueryDUnitTest.java
@@ -22,9 +22,14 @@ import static org.apache.geode.test.dunit.Assert.assertNotNull;
 import static org.apache.geode.test.dunit.Assert.assertNull;
 import static org.apache.geode.test.dunit.Assert.assertTrue;
 import static org.apache.geode.test.dunit.Assert.fail;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -121,6 +126,137 @@ public class PartitionedRegionCqQueryDUnitTest extends JUnit4CacheTestCase {
   private static int bridgeServerPort;
 
   @Test
+  public void testPutAllWithCQLocalDestroy() {
+    VM server1 = getVM(0);
+    VM server2 = getVM(1);
+    VM client = getVM(2);
+
+    final String cqName = "testPutAllWithCQLocalDestroy_0";
+    createServer(server1);
+    createServer(server2);
+    final String host = VM.getHostName();
+    final int port = server2.invoke(() -> getCacheServerPort());
+    createClient(client, port, host);
+    createCQ(client, cqName, cqs[0]);
+
+    int numObjects = 1000;
+
+    server1.invoke(() -> {
+      Region<String, Object> region =
+          getCache().getRegion(SEPARATOR + "root" + SEPARATOR + regions[0]);
+      Map<String, Object> buffer = new HashMap();
+      for (int i = 1; i < numObjects; i++) {
+        Portfolio p = new Portfolio(i);
+        buffer.put("" + i, p);
+      }
+      region.putAll(buffer);
+      assertThat(region.size()).isEqualTo(numObjects - 1);
+    });
+
+    client.invoke(() -> {
+      QueryService cqService = getCache().getQueryService();
+      CqQuery cqQuery = cqService.getCq(cqName);
+      assertThat(cqQuery)
+          .withFailMessage("Failed to get CQ " + cqName)
+          .isNotNull();
+      cqQuery.executeWithInitialResults();
+    });
+
+    server1.invoke(() -> {
+      Region<String, Object> region =
+          getCache().getRegion(SEPARATOR + "root" + SEPARATOR + regions[0]);
+      // PutAll with entries that do not satisfy CQ. This is to generate LOCAL_DESTROY CQ event
+      Map<String, Object> buffer = new HashMap();
+      for (int i = 1; i < numObjects; i++) {
+        Portfolio p = new Portfolio(-1 * i);
+        buffer.put("" + i, p);
+      }
+      region.putAll(buffer);
+      assertThat(region.size()).isEqualTo(numObjects - 1);
+    });
+
+    client.invoke(() -> {
+      QueryService cqService = getCache().getQueryService();
+      CqQuery cqQuery = cqService.getCq(cqName);
+      assertThat(cqQuery)
+          .withFailMessage("Failed to get CQ " + cqName)
+          .isNotNull();
+      CqQueryTestListener cqListener =
+          (CqQueryTestListener) cqQuery.getCqAttributes().getCqListener();
+      assertThat(cqListener.getTotalEventCount()).isEqualTo(numObjects - 1);
+    });
+
+    cqHelper.closeClient(client);
+    cqHelper.closeServer(server2);
+    cqHelper.closeServer(server1);
+  }
+
+  @Test
+  public void testRemoveAllWithCQLocalDestroy() {
+    VM server1 = getVM(0);
+    VM server2 = getVM(1);
+    VM client = getVM(2);
+
+    final String cqName = "testRemoveAllWithCQLocalDestroy_0";
+    createServer(server1);
+    createServer(server2);
+    final String host = VM.getHostName();
+    final int port = server2.invoke(() -> getCacheServerPort());
+    createClient(client, port, host);
+    createCQ(client, cqName, cqs[0]);
+
+    int numObjects = 1000;
+
+    server1.invoke(() -> {
+      Region<String, Object> region =
+          getCache().getRegion(SEPARATOR + "root" + SEPARATOR + regions[0]);
+      Map<String, Object> buffer = new HashMap();
+      for (int i = 1; i < numObjects; i++) {
+        Portfolio p = new Portfolio(i);
+        buffer.put("" + i, p);
+      }
+      region.putAll(buffer);
+      assertThat(region.size()).isEqualTo(numObjects - 1);
+    });
+
+    client.invoke(() -> {
+      QueryService cqService = getCache().getQueryService();
+      CqQuery cqQuery = cqService.getCq(cqName);
+      assertThat(cqQuery)
+          .withFailMessage("Failed to get CQ " + cqName)
+          .isNotNull();
+      cqQuery.executeWithInitialResults();
+    });
+
+    server1.invoke(() -> {
+      Region<String, Object> region =
+          getCache().getRegion(SEPARATOR + "root" + SEPARATOR + regions[0]);
+      Set<String> keys = new HashSet<>();
+      for (int i = 1; i < numObjects; i++) {
+        keys.add("" + i);
+      }
+      // This is to generate LOCAL_DESTROY CQ event
+      region.removeAll(keys);
+      assertThat(region.size()).isEqualTo(0);
+    });
+
+    client.invoke(() -> {
+      QueryService cqService = getCache().getQueryService();
+      CqQuery cqQuery = cqService.getCq(cqName);
+      assertThat(cqQuery)
+          .withFailMessage("Failed to get CQ " + cqName)
+          .isNotNull();
+      CqQueryTestListener cqListener =
+          (CqQueryTestListener) cqQuery.getCqAttributes().getCqListener();
+      assertThat(cqListener.getTotalEventCount()).isEqualTo(numObjects - 1);
+    });
+
+    cqHelper.closeClient(client);
+    cqHelper.closeServer(server2);
+    cqHelper.closeServer(server1);
+  }
+
+  @Test
   public void testCQLeakWithPartitionedRegion() throws Exception {
     // creating servers.
     final Host host = Host.getHost(0);