You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by jc...@apache.org on 2021/03/11 22:14:19 UTC

[geode] branch support/1.12 updated: GEODE-9016: Fix the NPE for PutAll with CQ LOCAL_DESTROY message type (#6104)

This is an automated email from the ASF dual-hosted git repository.

jchen21 pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.12 by this push:
     new 592c69f  GEODE-9016: Fix the NPE for PutAll with CQ LOCAL_DESTROY message type (#6104)
592c69f is described below

commit 592c69f2873ae9265273fa8044574612c2acebba
Author: Jianxia Chen <11...@users.noreply.github.com>
AuthorDate: Thu Mar 11 11:19:39 2021 -0800

    GEODE-9016: Fix the NPE for PutAll with CQ LOCAL_DESTROY message type (#6104)
    
    For PutAll and RemoveAll, when removing destroy token from CQ result keys, use the keys in the individual entry events, instead of using the key in the base event.
    
    (cherry picked from commit 4e0c8aa6937ad2b5935a11994138381fa29a8644)
---
 .../internal/cache/DistributedCacheOperation.java  |  25 ++--
 .../internal/cache/DistributedPutAllOperation.java |  20 +++
 .../cache/DistributedRemoveAllOperation.java       |  20 +++
 .../cache/DistributedCacheOperationTest.java       |  24 ++++
 .../cache/DistributedPutAllOperationTest.java      |  51 ++++++++
 .../cache/DistributedRemoveAllOperationTest.java   |  51 ++++++++
 .../dunit/PartitionedRegionCqQueryDUnitTest.java   | 136 +++++++++++++++++++++
 7 files changed, 317 insertions(+), 10 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
index 70f6328..79e0161 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
@@ -719,7 +719,7 @@ public abstract class DistributedCacheOperation {
   private void removeDestroyTokensFromCqResultKeys(FilterRoutingInfo filterRouting) {
     for (InternalDistributedMember m : filterRouting.getMembers()) {
       FilterInfo filterInfo = filterRouting.getFilterInfo(m);
-      if (filterInfo.getCQs() == null) {
+      if (filterInfo == null || filterInfo.getCQs() == null) {
         continue;
       }
 
@@ -734,15 +734,20 @@ public abstract class DistributedCacheOperation {
       for (Object value : cf.filterProfile.getCqMap().values()) {
         ServerCQ cq = (ServerCQ) value;
 
-        for (Map.Entry<Long, Integer> e : filterInfo.getCQs().entrySet()) {
-          Long cqID = e.getKey();
-          // For the CQs satisfying the event with destroy CQEvent, remove
-          // the entry form CQ cache.
-          if (cq != null && cq.getFilterID() != null && cq.getFilterID().equals(cqID)
-              && (e.getValue().equals(MessageType.LOCAL_DESTROY))) {
-            cq.removeFromCqResultKeys(((EntryOperation) event).getKey(), true);
-          }
-        }
+        doRemoveDestroyTokensFromCqResultKeys(filterInfo, cq);
+      }
+    }
+  }
+
+  void doRemoveDestroyTokensFromCqResultKeys(FilterInfo filterInfo, ServerCQ cq) {
+    for (Map.Entry<Long, Integer> e : filterInfo.getCQs().entrySet()) {
+      Long cqID = e.getKey();
+      // For the CQs satisfying the event with destroy CQEvent, remove
+      // the entry form CQ cache.
+      if (cq != null && cq.getFilterID() != null && cq.getFilterID().equals(cqID)
+          && e.getValue() != null && e.getValue().equals(MessageType.LOCAL_DESTROY)
+          && ((EntryOperation) event).getKey() != null) {
+        cq.removeFromCqResultKeys(((EntryOperation) event).getKey(), true);
       }
     }
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java
index 7026a9d..f0cb89b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java
@@ -25,6 +25,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
@@ -39,6 +40,7 @@ import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.cache.Scope;
 import org.apache.geode.cache.persistence.PersistentReplicatesOfflineException;
 import org.apache.geode.cache.query.internal.cq.CqService;
+import org.apache.geode.cache.query.internal.cq.ServerCQ;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.DirectReplyProcessor;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
@@ -46,6 +48,7 @@ import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo;
 import org.apache.geode.internal.cache.ha.ThreadIdentifier;
 import org.apache.geode.internal.cache.partitioned.PutAllPRMessage;
+import org.apache.geode.internal.cache.tier.MessageType;
 import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
 import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
 import org.apache.geode.internal.cache.tx.RemotePutAllMessage;
@@ -814,6 +817,23 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation {
     return consolidated;
   }
 
+  @Override
+  void doRemoveDestroyTokensFromCqResultKeys(FilterInfo filterInfo, ServerCQ cq) {
+    for (Map.Entry<Long, Integer> e : filterInfo.getCQs().entrySet()) {
+      Long cqID = e.getKey();
+      // For the CQs satisfying the event with destroy CQEvent, remove
+      // the entry from CQ cache.
+      for (int i = 0; i < this.putAllData.length; i++) {
+        @Unretained
+        EntryEventImpl entryEvent = getEventForPosition(i);
+        if (entryEvent != null && entryEvent.getKey() != null && cq != null
+            && cq.getFilterID() != null && cq.getFilterID().equals(cqID)
+            && e.getValue() != null && e.getValue().equals(MessageType.LOCAL_DESTROY)) {
+          cq.removeFromCqResultKeys(entryEvent.getKey(), true);
+        }
+      }
+    }
+  }
 
   @Override
   protected FilterInfo getLocalFilterRouting(FilterRoutingInfo frInfo) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java
index 03ea590..5457df2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.logging.log4j.Logger;
@@ -33,6 +34,7 @@ import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.cache.Scope;
 import org.apache.geode.cache.persistence.PersistentReplicatesOfflineException;
 import org.apache.geode.cache.query.internal.cq.CqService;
+import org.apache.geode.cache.query.internal.cq.ServerCQ;
 import org.apache.geode.distributed.internal.DirectReplyProcessor;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.InternalDataSerializer;
@@ -40,6 +42,7 @@ import org.apache.geode.internal.cache.DistributedPutAllOperation.EntryVersionsL
 import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo;
 import org.apache.geode.internal.cache.ha.ThreadIdentifier;
 import org.apache.geode.internal.cache.partitioned.RemoveAllPRMessage;
+import org.apache.geode.internal.cache.tier.MessageType;
 import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
 import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
 import org.apache.geode.internal.cache.tx.RemoteRemoveAllMessage;
@@ -584,6 +587,23 @@ public class DistributedRemoveAllOperation extends AbstractUpdateOperation {
     return consolidated;
   }
 
+  @Override
+  void doRemoveDestroyTokensFromCqResultKeys(FilterInfo filterInfo, ServerCQ cq) {
+    for (Map.Entry<Long, Integer> e : filterInfo.getCQs().entrySet()) {
+      Long cqID = e.getKey();
+      // For the CQs satisfying the event with destroy CQEvent, remove
+      // the entry from CQ cache.
+      for (int i = 0; i < this.removeAllData.length; i++) {
+        @Unretained
+        EntryEventImpl entryEvent = getEventForPosition(i);
+        if (entryEvent != null && entryEvent.getKey() != null && cq != null
+            && cq.getFilterID() != null && cq.getFilterID().equals(cqID)
+            && e.getValue() != null && e.getValue().equals(MessageType.LOCAL_DESTROY)) {
+          cq.removeFromCqResultKeys(entryEvent.getKey(), true);
+        }
+      }
+    }
+  }
 
   @Override
   protected FilterInfo getLocalFilterRouting(FilterRoutingInfo frInfo) {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java
index a97fecc..606fac6 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java
@@ -17,6 +17,8 @@ package org.apache.geode.internal.cache;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -28,9 +30,11 @@ import java.util.Map;
 import org.junit.Test;
 
 import org.apache.geode.cache.CacheEvent;
+import org.apache.geode.cache.query.internal.cq.ServerCQ;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.DistributedCacheOperation.CacheOperationMessage;
 import org.apache.geode.internal.cache.persistence.PersistentMemberID;
+import org.apache.geode.internal.cache.tier.MessageType;
 
 public class DistributedCacheOperationTest {
 
@@ -106,4 +110,24 @@ public class DistributedCacheOperationTest {
       throw new RuntimeException("boom");
     }
   }
+
+  @Test
+  public void testDoRemoveDestroyTokensFromCqResultKeys() {
+    Object key = new Object();
+    HashMap hashMap = new HashMap();
+    hashMap.put(1L, MessageType.LOCAL_DESTROY);
+    EntryEventImpl baseEvent = mock(EntryEventImpl.class);
+    ServerCQ serverCQ = mock(ServerCQ.class);
+    FilterRoutingInfo.FilterInfo filterInfo = mock(FilterRoutingInfo.FilterInfo.class);
+    DistributedCacheOperation distributedCacheOperation =
+        new DestroyOperation(baseEvent);
+    when(baseEvent.getKey()).thenReturn(key);
+    when(filterInfo.getCQs()).thenReturn(hashMap);
+    when(serverCQ.getFilterID()).thenReturn(new Long(1L));
+    doNothing().when(serverCQ).removeFromCqResultKeys(isA(Object.class), isA(Boolean.class));
+
+    distributedCacheOperation.doRemoveDestroyTokensFromCqResultKeys(filterInfo, serverCQ);
+
+    verify(serverCQ, times(1)).removeFromCqResultKeys(key, true);
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedPutAllOperationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedPutAllOperationTest.java
index a06920e..da0c1cf 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedPutAllOperationTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedPutAllOperationTest.java
@@ -15,11 +15,25 @@
 package org.apache.geode.internal.cache;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.HashMap;
+
 import org.junit.Test;
 
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.query.internal.cq.CqService;
+import org.apache.geode.cache.query.internal.cq.ServerCQ;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.cache.tier.MessageType;
+
 
 public class DistributedPutAllOperationTest {
 
@@ -33,4 +47,41 @@ public class DistributedPutAllOperationTest {
 
     assertThat(mockDistributedPutAllOperation.getBaseEvent()).isSameAs(mockEntryEventImpl);
   }
+
+  @Test
+  public void testDoRemoveDestroyTokensFromCqResultKeys() {
+    EntryEventImpl baseEvent = mock(EntryEventImpl.class);
+    EntryEventImpl entryEvent = mock(EntryEventImpl.class);
+    BucketRegion bucketRegion = mock(BucketRegion.class);
+    InternalCache internalCache = mock(InternalCache.class);
+    RegionAttributes regionAttributes = mock(RegionAttributes.class);
+    FilterRoutingInfo.FilterInfo filterInfo = mock(FilterRoutingInfo.FilterInfo.class);
+    CqService cqService = mock(CqService.class);
+    InternalDistributedSystem internalDistributedSystem = mock(InternalDistributedSystem.class);
+    PartitionedRegion partitionedRegion = mock(PartitionedRegion.class);
+    ServerCQ serverCQ = mock(ServerCQ.class);
+    int putAllPRDataSize = 1;
+    DistributedPutAllOperation distributedPutAllOperation =
+        new DistributedPutAllOperation(baseEvent, putAllPRDataSize, false);
+    Object key = new Object();
+    when(entryEvent.getKey()).thenReturn(key);
+    distributedPutAllOperation.addEntry(entryEvent);
+    HashMap hashMap = new HashMap();
+    hashMap.put(1L, MessageType.LOCAL_DESTROY);
+    when(filterInfo.getCQs()).thenReturn(hashMap);
+    when(baseEvent.getRegion()).thenReturn(bucketRegion);
+    when(bucketRegion.getAttributes()).thenReturn(regionAttributes);
+    when(regionAttributes.getDataPolicy()).thenReturn(DataPolicy.DEFAULT);
+    when(bucketRegion.getCache()).thenReturn(internalCache);
+    when(bucketRegion.getPartitionedRegion()).thenReturn(partitionedRegion);
+    when(bucketRegion.getKeyInfo(any(), any(), any())).thenReturn(new KeyInfo(key, null, null));
+    when(internalCache.getDistributedSystem()).thenReturn(internalDistributedSystem);
+    when(internalCache.getCqService()).thenReturn(cqService);
+    when(serverCQ.getFilterID()).thenReturn(new Long(1L));
+    doNothing().when(serverCQ).removeFromCqResultKeys(isA(Object.class), isA(Boolean.class));
+
+    distributedPutAllOperation.doRemoveDestroyTokensFromCqResultKeys(filterInfo, serverCQ);
+
+    verify(serverCQ, times(1)).removeFromCqResultKeys(key, true);
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRemoveAllOperationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRemoveAllOperationTest.java
index bd84116..7f2eb46 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRemoveAllOperationTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRemoveAllOperationTest.java
@@ -15,11 +15,25 @@
 package org.apache.geode.internal.cache;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.HashMap;
+
 import org.junit.Test;
 
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.query.internal.cq.CqService;
+import org.apache.geode.cache.query.internal.cq.ServerCQ;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.cache.tier.MessageType;
+
 
 public class DistributedRemoveAllOperationTest {
 
@@ -33,4 +47,41 @@ public class DistributedRemoveAllOperationTest {
 
     assertThat(mockDistributedRemoveAllOperation.getBaseEvent()).isSameAs(mockEntryEventImpl);
   }
+
+  @Test
+  public void testDoRemoveDestroyTokensFromCqResultKeys() {
+    EntryEventImpl baseEvent = mock(EntryEventImpl.class);
+    EntryEventImpl entryEvent = mock(EntryEventImpl.class);
+    BucketRegion bucketRegion = mock(BucketRegion.class);
+    InternalCache internalCache = mock(InternalCache.class);
+    RegionAttributes regionAttributes = mock(RegionAttributes.class);
+    InternalDistributedSystem internalDistributedSystem = mock(InternalDistributedSystem.class);
+    FilterRoutingInfo.FilterInfo filterInfo = mock(FilterRoutingInfo.FilterInfo.class);
+    CqService cqService = mock(CqService.class);
+    PartitionedRegion partitionedRegion = mock(PartitionedRegion.class);
+    ServerCQ serverCQ = mock(ServerCQ.class);
+    int removeAllPRDataSize = 1;
+    DistributedRemoveAllOperation distributedRemoveAllOperation =
+        new DistributedRemoveAllOperation(baseEvent, removeAllPRDataSize, false);
+    Object key = new Object();
+    when(entryEvent.getKey()).thenReturn(key);
+    distributedRemoveAllOperation.addEntry(entryEvent);
+    HashMap hashMap = new HashMap();
+    hashMap.put(1L, MessageType.LOCAL_DESTROY);
+    when(filterInfo.getCQs()).thenReturn(hashMap);
+    when(baseEvent.getRegion()).thenReturn(bucketRegion);
+    when(bucketRegion.getAttributes()).thenReturn(regionAttributes);
+    when(bucketRegion.getPartitionedRegion()).thenReturn(partitionedRegion);
+    when(bucketRegion.getCache()).thenReturn(internalCache);
+    when(bucketRegion.getKeyInfo(any(), any(), any())).thenReturn(new KeyInfo(key, null, null));
+    when(regionAttributes.getDataPolicy()).thenReturn(DataPolicy.DEFAULT);
+    when(internalCache.getDistributedSystem()).thenReturn(internalDistributedSystem);
+    when(internalCache.getCqService()).thenReturn(cqService);
+    when(serverCQ.getFilterID()).thenReturn(new Long(1L));
+    doNothing().when(serverCQ).removeFromCqResultKeys(isA(Object.class), isA(Boolean.class));
+
+    distributedRemoveAllOperation.doRemoveDestroyTokensFromCqResultKeys(filterInfo, serverCQ);
+
+    verify(serverCQ, times(1)).removeFromCqResultKeys(key, true);
+  }
 }
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionCqQueryDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionCqQueryDUnitTest.java
index a0eb88c..dbb89e9 100644
--- a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionCqQueryDUnitTest.java
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionCqQueryDUnitTest.java
@@ -21,9 +21,14 @@ import static org.apache.geode.test.dunit.Assert.assertNotNull;
 import static org.apache.geode.test.dunit.Assert.assertNull;
 import static org.apache.geode.test.dunit.Assert.assertTrue;
 import static org.apache.geode.test.dunit.Assert.fail;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -114,6 +119,137 @@ public class PartitionedRegionCqQueryDUnitTest extends JUnit4CacheTestCase {
   private static int bridgeServerPort;
 
   @Test
+  public void testPutAllWithCQLocalDestroy() {
+    VM server1 = getVM(0);
+    VM server2 = getVM(1);
+    VM client = getVM(2);
+
+    final String cqName = "testPutAllWithCQLocalDestroy_0";
+    createServer(server1);
+    createServer(server2);
+    final String host = VM.getHostName();
+    final int port = server2.invoke(() -> getCacheServerPort());
+    createClient(client, port, host);
+    createCQ(client, cqName, cqs[0]);
+
+    int numObjects = 1000;
+
+    server1.invoke(() -> {
+      Region<String, Object> region =
+          getCache().getRegion("/root/" + regions[0]);
+      Map<String, Object> buffer = new HashMap();
+      for (int i = 1; i < numObjects; i++) {
+        Portfolio p = new Portfolio(i);
+        buffer.put("" + i, p);
+      }
+      region.putAll(buffer);
+      assertThat(region.size()).isEqualTo(numObjects - 1);
+    });
+
+    client.invoke(() -> {
+      QueryService cqService = getCache().getQueryService();
+      CqQuery cqQuery = cqService.getCq(cqName);
+      assertThat(cqQuery)
+          .withFailMessage("Failed to get CQ " + cqName)
+          .isNotNull();
+      cqQuery.executeWithInitialResults();
+    });
+
+    server1.invoke(() -> {
+      Region<String, Object> region =
+          getCache().getRegion("/root/" + regions[0]);
+      // PutAll with entries that do not satisfy CQ. This is to generate LOCAL_DESTROY CQ event
+      Map<String, Object> buffer = new HashMap();
+      for (int i = 1; i < numObjects; i++) {
+        Portfolio p = new Portfolio(-1 * i);
+        buffer.put("" + i, p);
+      }
+      region.putAll(buffer);
+      assertThat(region.size()).isEqualTo(numObjects - 1);
+    });
+
+    client.invoke(() -> {
+      QueryService cqService = getCache().getQueryService();
+      CqQuery cqQuery = cqService.getCq(cqName);
+      assertThat(cqQuery)
+          .withFailMessage("Failed to get CQ " + cqName)
+          .isNotNull();
+      CqQueryTestListener cqListener =
+          (CqQueryTestListener) cqQuery.getCqAttributes().getCqListener();
+      assertThat(cqListener.getTotalEventCount()).isEqualTo(numObjects - 1);
+    });
+
+    cqHelper.closeClient(client);
+    cqHelper.closeServer(server2);
+    cqHelper.closeServer(server1);
+  }
+
+  @Test
+  public void testRemoveAllWithCQLocalDestroy() {
+    VM server1 = getVM(0);
+    VM server2 = getVM(1);
+    VM client = getVM(2);
+
+    final String cqName = "testRemoveAllWithCQLocalDestroy_0";
+    createServer(server1);
+    createServer(server2);
+    final String host = VM.getHostName();
+    final int port = server2.invoke(() -> getCacheServerPort());
+    createClient(client, port, host);
+    createCQ(client, cqName, cqs[0]);
+
+    int numObjects = 1000;
+
+    server1.invoke(() -> {
+      Region<String, Object> region =
+          getCache().getRegion("/root/" + regions[0]);
+      Map<String, Object> buffer = new HashMap();
+      for (int i = 1; i < numObjects; i++) {
+        Portfolio p = new Portfolio(i);
+        buffer.put("" + i, p);
+      }
+      region.putAll(buffer);
+      assertThat(region.size()).isEqualTo(numObjects - 1);
+    });
+
+    client.invoke(() -> {
+      QueryService cqService = getCache().getQueryService();
+      CqQuery cqQuery = cqService.getCq(cqName);
+      assertThat(cqQuery)
+          .withFailMessage("Failed to get CQ " + cqName)
+          .isNotNull();
+      cqQuery.executeWithInitialResults();
+    });
+
+    server1.invoke(() -> {
+      Region<String, Object> region =
+          getCache().getRegion("/root/" + regions[0]);
+      Set<String> keys = new HashSet<>();
+      for (int i = 1; i < numObjects; i++) {
+        keys.add("" + i);
+      }
+      // This is to generate LOCAL_DESTROY CQ event
+      region.removeAll(keys);
+      assertThat(region.size()).isEqualTo(0);
+    });
+
+    client.invoke(() -> {
+      QueryService cqService = getCache().getQueryService();
+      CqQuery cqQuery = cqService.getCq(cqName);
+      assertThat(cqQuery)
+          .withFailMessage("Failed to get CQ " + cqName)
+          .isNotNull();
+      CqQueryTestListener cqListener =
+          (CqQueryTestListener) cqQuery.getCqAttributes().getCqListener();
+      assertThat(cqListener.getTotalEventCount()).isEqualTo(numObjects - 1);
+    });
+
+    cqHelper.closeClient(client);
+    cqHelper.closeServer(server2);
+    cqHelper.closeServer(server1);
+  }
+
+  @Test
   public void testCQLeakWithPartitionedRegion() throws Exception {
     // creating servers.
     final Host host = Host.getHost(0);