You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by jc...@apache.org on 2021/03/11 19:20:54 UTC

[geode] branch develop updated: GEODE-9016: Fix the NPE for PutAll with CQ LOCAL_DESTROY message type (#6104)

This is an automated email from the ASF dual-hosted git repository.

jchen21 pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 4e0c8aa  GEODE-9016: Fix the NPE for PutAll with CQ LOCAL_DESTROY message type (#6104)
4e0c8aa is described below

commit 4e0c8aa6937ad2b5935a11994138381fa29a8644
Author: Jianxia Chen <11...@users.noreply.github.com>
AuthorDate: Thu Mar 11 11:19:39 2021 -0800

    GEODE-9016: Fix the NPE for PutAll with CQ LOCAL_DESTROY message type (#6104)
    
    For PutAll and RemoveAll, when removing destroy token from CQ result keys, use the keys in the individual entry events, instead of using the key in the base event.
---
 .../internal/cache/DistributedCacheOperation.java  |  25 ++--
 .../internal/cache/DistributedPutAllOperation.java |  20 +++
 .../cache/DistributedRemoveAllOperation.java       |  20 +++
 .../cache/DistributedCacheOperationTest.java       |  24 ++++
 .../cache/DistributedPutAllOperationTest.java      |  51 ++++++++
 .../cache/DistributedRemoveAllOperationTest.java   |  51 ++++++++
 .../dunit/PartitionedRegionCqQueryDUnitTest.java   | 136 +++++++++++++++++++++
 7 files changed, 317 insertions(+), 10 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
index 3e55045..b2f2767 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
@@ -719,7 +719,7 @@ public abstract class DistributedCacheOperation {
   private void removeDestroyTokensFromCqResultKeys(FilterRoutingInfo filterRouting) {
     for (InternalDistributedMember m : filterRouting.getMembers()) {
       FilterInfo filterInfo = filterRouting.getFilterInfo(m);
-      if (filterInfo.getCQs() == null) {
+      if (filterInfo == null || filterInfo.getCQs() == null) {
         continue;
       }
 
@@ -734,15 +734,20 @@ public abstract class DistributedCacheOperation {
       for (Object value : cf.filterProfile.getCqMap().values()) {
         ServerCQ cq = (ServerCQ) value;
 
-        for (Map.Entry<Long, Integer> e : filterInfo.getCQs().entrySet()) {
-          Long cqID = e.getKey();
-          // For the CQs satisfying the event with destroy CQEvent, remove
-          // the entry form CQ cache.
-          if (cq != null && cq.getFilterID() != null && cq.getFilterID().equals(cqID)
-              && (e.getValue().equals(MessageType.LOCAL_DESTROY))) {
-            cq.removeFromCqResultKeys(((EntryOperation) event).getKey(), true);
-          }
-        }
+        doRemoveDestroyTokensFromCqResultKeys(filterInfo, cq);
+      }
+    }
+  }
+
+  void doRemoveDestroyTokensFromCqResultKeys(FilterInfo filterInfo, ServerCQ cq) {
+    for (Map.Entry<Long, Integer> e : filterInfo.getCQs().entrySet()) {
+      Long cqID = e.getKey();
+      // For the CQs satisfying the event with destroy CQEvent, remove
+      // the entry form CQ cache.
+      if (cq != null && cq.getFilterID() != null && cq.getFilterID().equals(cqID)
+          && e.getValue() != null && e.getValue().equals(MessageType.LOCAL_DESTROY)
+          && ((EntryOperation) event).getKey() != null) {
+        cq.removeFromCqResultKeys(((EntryOperation) event).getKey(), true);
       }
     }
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java
index 9a9737e..842d7cd 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java
@@ -25,6 +25,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
@@ -39,6 +40,7 @@ import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.cache.Scope;
 import org.apache.geode.cache.persistence.PersistentReplicatesOfflineException;
 import org.apache.geode.cache.query.internal.cq.CqService;
+import org.apache.geode.cache.query.internal.cq.ServerCQ;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.DirectReplyProcessor;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
@@ -46,6 +48,7 @@ import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo;
 import org.apache.geode.internal.cache.ha.ThreadIdentifier;
 import org.apache.geode.internal.cache.partitioned.PutAllPRMessage;
+import org.apache.geode.internal.cache.tier.MessageType;
 import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
 import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
 import org.apache.geode.internal.cache.tx.RemotePutAllMessage;
@@ -814,6 +817,23 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation {
     return consolidated;
   }
 
+  @Override
+  void doRemoveDestroyTokensFromCqResultKeys(FilterInfo filterInfo, ServerCQ cq) {
+    for (Map.Entry<Long, Integer> e : filterInfo.getCQs().entrySet()) {
+      Long cqID = e.getKey();
+      // For the CQs satisfying the event with destroy CQEvent, remove
+      // the entry from CQ cache.
+      for (int i = 0; i < this.putAllData.length; i++) {
+        @Unretained
+        EntryEventImpl entryEvent = getEventForPosition(i);
+        if (entryEvent != null && entryEvent.getKey() != null && cq != null
+            && cq.getFilterID() != null && cq.getFilterID().equals(cqID)
+            && e.getValue() != null && e.getValue().equals(MessageType.LOCAL_DESTROY)) {
+          cq.removeFromCqResultKeys(entryEvent.getKey(), true);
+        }
+      }
+    }
+  }
 
   @Override
   protected FilterInfo getLocalFilterRouting(FilterRoutingInfo frInfo) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java
index f76c701..942a4dd 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.logging.log4j.Logger;
@@ -33,6 +34,7 @@ import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.cache.Scope;
 import org.apache.geode.cache.persistence.PersistentReplicatesOfflineException;
 import org.apache.geode.cache.query.internal.cq.CqService;
+import org.apache.geode.cache.query.internal.cq.ServerCQ;
 import org.apache.geode.distributed.internal.DirectReplyProcessor;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.InternalDataSerializer;
@@ -40,6 +42,7 @@ import org.apache.geode.internal.cache.DistributedPutAllOperation.EntryVersionsL
 import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo;
 import org.apache.geode.internal.cache.ha.ThreadIdentifier;
 import org.apache.geode.internal.cache.partitioned.RemoveAllPRMessage;
+import org.apache.geode.internal.cache.tier.MessageType;
 import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
 import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
 import org.apache.geode.internal.cache.tx.RemoteRemoveAllMessage;
@@ -584,6 +587,23 @@ public class DistributedRemoveAllOperation extends AbstractUpdateOperation {
     return consolidated;
   }
 
+  @Override
+  void doRemoveDestroyTokensFromCqResultKeys(FilterInfo filterInfo, ServerCQ cq) {
+    for (Map.Entry<Long, Integer> e : filterInfo.getCQs().entrySet()) {
+      Long cqID = e.getKey();
+      // For the CQs satisfying the event with destroy CQEvent, remove
+      // the entry from CQ cache.
+      for (int i = 0; i < this.removeAllData.length; i++) {
+        @Unretained
+        EntryEventImpl entryEvent = getEventForPosition(i);
+        if (entryEvent != null && entryEvent.getKey() != null && cq != null
+            && cq.getFilterID() != null && cq.getFilterID().equals(cqID)
+            && e.getValue() != null && e.getValue().equals(MessageType.LOCAL_DESTROY)) {
+          cq.removeFromCqResultKeys(entryEvent.getKey(), true);
+        }
+      }
+    }
+  }
 
   @Override
   protected FilterInfo getLocalFilterRouting(FilterRoutingInfo frInfo) {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java
index a97fecc..606fac6 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java
@@ -17,6 +17,8 @@ package org.apache.geode.internal.cache;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -28,9 +30,11 @@ import java.util.Map;
 import org.junit.Test;
 
 import org.apache.geode.cache.CacheEvent;
+import org.apache.geode.cache.query.internal.cq.ServerCQ;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.DistributedCacheOperation.CacheOperationMessage;
 import org.apache.geode.internal.cache.persistence.PersistentMemberID;
+import org.apache.geode.internal.cache.tier.MessageType;
 
 public class DistributedCacheOperationTest {
 
@@ -106,4 +110,24 @@ public class DistributedCacheOperationTest {
       throw new RuntimeException("boom");
     }
   }
+
+  @Test
+  public void testDoRemoveDestroyTokensFromCqResultKeys() {
+    Object key = new Object();
+    HashMap hashMap = new HashMap();
+    hashMap.put(1L, MessageType.LOCAL_DESTROY);
+    EntryEventImpl baseEvent = mock(EntryEventImpl.class);
+    ServerCQ serverCQ = mock(ServerCQ.class);
+    FilterRoutingInfo.FilterInfo filterInfo = mock(FilterRoutingInfo.FilterInfo.class);
+    DistributedCacheOperation distributedCacheOperation =
+        new DestroyOperation(baseEvent);
+    when(baseEvent.getKey()).thenReturn(key);
+    when(filterInfo.getCQs()).thenReturn(hashMap);
+    when(serverCQ.getFilterID()).thenReturn(new Long(1L));
+    doNothing().when(serverCQ).removeFromCqResultKeys(isA(Object.class), isA(Boolean.class));
+
+    distributedCacheOperation.doRemoveDestroyTokensFromCqResultKeys(filterInfo, serverCQ);
+
+    verify(serverCQ, times(1)).removeFromCqResultKeys(key, true);
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedPutAllOperationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedPutAllOperationTest.java
index a06920e..da0c1cf 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedPutAllOperationTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedPutAllOperationTest.java
@@ -15,11 +15,25 @@
 package org.apache.geode.internal.cache;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.HashMap;
+
 import org.junit.Test;
 
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.query.internal.cq.CqService;
+import org.apache.geode.cache.query.internal.cq.ServerCQ;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.cache.tier.MessageType;
+
 
 public class DistributedPutAllOperationTest {
 
@@ -33,4 +47,41 @@ public class DistributedPutAllOperationTest {
 
     assertThat(mockDistributedPutAllOperation.getBaseEvent()).isSameAs(mockEntryEventImpl);
   }
+
+  @Test
+  public void testDoRemoveDestroyTokensFromCqResultKeys() {
+    EntryEventImpl baseEvent = mock(EntryEventImpl.class);
+    EntryEventImpl entryEvent = mock(EntryEventImpl.class);
+    BucketRegion bucketRegion = mock(BucketRegion.class);
+    InternalCache internalCache = mock(InternalCache.class);
+    RegionAttributes regionAttributes = mock(RegionAttributes.class);
+    FilterRoutingInfo.FilterInfo filterInfo = mock(FilterRoutingInfo.FilterInfo.class);
+    CqService cqService = mock(CqService.class);
+    InternalDistributedSystem internalDistributedSystem = mock(InternalDistributedSystem.class);
+    PartitionedRegion partitionedRegion = mock(PartitionedRegion.class);
+    ServerCQ serverCQ = mock(ServerCQ.class);
+    int putAllPRDataSize = 1;
+    DistributedPutAllOperation distributedPutAllOperation =
+        new DistributedPutAllOperation(baseEvent, putAllPRDataSize, false);
+    Object key = new Object();
+    when(entryEvent.getKey()).thenReturn(key);
+    distributedPutAllOperation.addEntry(entryEvent);
+    HashMap hashMap = new HashMap();
+    hashMap.put(1L, MessageType.LOCAL_DESTROY);
+    when(filterInfo.getCQs()).thenReturn(hashMap);
+    when(baseEvent.getRegion()).thenReturn(bucketRegion);
+    when(bucketRegion.getAttributes()).thenReturn(regionAttributes);
+    when(regionAttributes.getDataPolicy()).thenReturn(DataPolicy.DEFAULT);
+    when(bucketRegion.getCache()).thenReturn(internalCache);
+    when(bucketRegion.getPartitionedRegion()).thenReturn(partitionedRegion);
+    when(bucketRegion.getKeyInfo(any(), any(), any())).thenReturn(new KeyInfo(key, null, null));
+    when(internalCache.getDistributedSystem()).thenReturn(internalDistributedSystem);
+    when(internalCache.getCqService()).thenReturn(cqService);
+    when(serverCQ.getFilterID()).thenReturn(new Long(1L));
+    doNothing().when(serverCQ).removeFromCqResultKeys(isA(Object.class), isA(Boolean.class));
+
+    distributedPutAllOperation.doRemoveDestroyTokensFromCqResultKeys(filterInfo, serverCQ);
+
+    verify(serverCQ, times(1)).removeFromCqResultKeys(key, true);
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRemoveAllOperationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRemoveAllOperationTest.java
index bd84116..7f2eb46 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRemoveAllOperationTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRemoveAllOperationTest.java
@@ -15,11 +15,25 @@
 package org.apache.geode.internal.cache;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.HashMap;
+
 import org.junit.Test;
 
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.query.internal.cq.CqService;
+import org.apache.geode.cache.query.internal.cq.ServerCQ;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.cache.tier.MessageType;
+
 
 public class DistributedRemoveAllOperationTest {
 
@@ -33,4 +47,41 @@ public class DistributedRemoveAllOperationTest {
 
     assertThat(mockDistributedRemoveAllOperation.getBaseEvent()).isSameAs(mockEntryEventImpl);
   }
+
+  @Test
+  public void testDoRemoveDestroyTokensFromCqResultKeys() {
+    EntryEventImpl baseEvent = mock(EntryEventImpl.class);
+    EntryEventImpl entryEvent = mock(EntryEventImpl.class);
+    BucketRegion bucketRegion = mock(BucketRegion.class);
+    InternalCache internalCache = mock(InternalCache.class);
+    RegionAttributes regionAttributes = mock(RegionAttributes.class);
+    InternalDistributedSystem internalDistributedSystem = mock(InternalDistributedSystem.class);
+    FilterRoutingInfo.FilterInfo filterInfo = mock(FilterRoutingInfo.FilterInfo.class);
+    CqService cqService = mock(CqService.class);
+    PartitionedRegion partitionedRegion = mock(PartitionedRegion.class);
+    ServerCQ serverCQ = mock(ServerCQ.class);
+    int removeAllPRDataSize = 1;
+    DistributedRemoveAllOperation distributedRemoveAllOperation =
+        new DistributedRemoveAllOperation(baseEvent, removeAllPRDataSize, false);
+    Object key = new Object();
+    when(entryEvent.getKey()).thenReturn(key);
+    distributedRemoveAllOperation.addEntry(entryEvent);
+    HashMap hashMap = new HashMap();
+    hashMap.put(1L, MessageType.LOCAL_DESTROY);
+    when(filterInfo.getCQs()).thenReturn(hashMap);
+    when(baseEvent.getRegion()).thenReturn(bucketRegion);
+    when(bucketRegion.getAttributes()).thenReturn(regionAttributes);
+    when(bucketRegion.getPartitionedRegion()).thenReturn(partitionedRegion);
+    when(bucketRegion.getCache()).thenReturn(internalCache);
+    when(bucketRegion.getKeyInfo(any(), any(), any())).thenReturn(new KeyInfo(key, null, null));
+    when(regionAttributes.getDataPolicy()).thenReturn(DataPolicy.DEFAULT);
+    when(internalCache.getDistributedSystem()).thenReturn(internalDistributedSystem);
+    when(internalCache.getCqService()).thenReturn(cqService);
+    when(serverCQ.getFilterID()).thenReturn(new Long(1L));
+    doNothing().when(serverCQ).removeFromCqResultKeys(isA(Object.class), isA(Boolean.class));
+
+    distributedRemoveAllOperation.doRemoveDestroyTokensFromCqResultKeys(filterInfo, serverCQ);
+
+    verify(serverCQ, times(1)).removeFromCqResultKeys(key, true);
+  }
 }
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionCqQueryDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionCqQueryDUnitTest.java
index 3f520f6..6d2e979 100644
--- a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionCqQueryDUnitTest.java
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionCqQueryDUnitTest.java
@@ -22,9 +22,14 @@ import static org.apache.geode.test.dunit.Assert.assertNotNull;
 import static org.apache.geode.test.dunit.Assert.assertNull;
 import static org.apache.geode.test.dunit.Assert.assertTrue;
 import static org.apache.geode.test.dunit.Assert.fail;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -121,6 +126,137 @@ public class PartitionedRegionCqQueryDUnitTest extends JUnit4CacheTestCase {
   private static int bridgeServerPort;
 
   @Test
+  public void testPutAllWithCQLocalDestroy() {
+    VM server1 = getVM(0);
+    VM server2 = getVM(1);
+    VM client = getVM(2);
+
+    final String cqName = "testPutAllWithCQLocalDestroy_0";
+    createServer(server1);
+    createServer(server2);
+    final String host = VM.getHostName();
+    final int port = server2.invoke(() -> getCacheServerPort());
+    createClient(client, port, host);
+    createCQ(client, cqName, cqs[0]);
+
+    int numObjects = 1000;
+
+    server1.invoke(() -> {
+      Region<String, Object> region =
+          getCache().getRegion(SEPARATOR + "root" + SEPARATOR + regions[0]);
+      Map<String, Object> buffer = new HashMap();
+      for (int i = 1; i < numObjects; i++) {
+        Portfolio p = new Portfolio(i);
+        buffer.put("" + i, p);
+      }
+      region.putAll(buffer);
+      assertThat(region.size()).isEqualTo(numObjects - 1);
+    });
+
+    client.invoke(() -> {
+      QueryService cqService = getCache().getQueryService();
+      CqQuery cqQuery = cqService.getCq(cqName);
+      assertThat(cqQuery)
+          .withFailMessage("Failed to get CQ " + cqName)
+          .isNotNull();
+      cqQuery.executeWithInitialResults();
+    });
+
+    server1.invoke(() -> {
+      Region<String, Object> region =
+          getCache().getRegion(SEPARATOR + "root" + SEPARATOR + regions[0]);
+      // PutAll with entries that do not satisfy CQ. This is to generate LOCAL_DESTROY CQ event
+      Map<String, Object> buffer = new HashMap();
+      for (int i = 1; i < numObjects; i++) {
+        Portfolio p = new Portfolio(-1 * i);
+        buffer.put("" + i, p);
+      }
+      region.putAll(buffer);
+      assertThat(region.size()).isEqualTo(numObjects - 1);
+    });
+
+    client.invoke(() -> {
+      QueryService cqService = getCache().getQueryService();
+      CqQuery cqQuery = cqService.getCq(cqName);
+      assertThat(cqQuery)
+          .withFailMessage("Failed to get CQ " + cqName)
+          .isNotNull();
+      CqQueryTestListener cqListener =
+          (CqQueryTestListener) cqQuery.getCqAttributes().getCqListener();
+      assertThat(cqListener.getTotalEventCount()).isEqualTo(numObjects - 1);
+    });
+
+    cqHelper.closeClient(client);
+    cqHelper.closeServer(server2);
+    cqHelper.closeServer(server1);
+  }
+
+  @Test
+  public void testRemoveAllWithCQLocalDestroy() {
+    VM server1 = getVM(0);
+    VM server2 = getVM(1);
+    VM client = getVM(2);
+
+    final String cqName = "testRemoveAllWithCQLocalDestroy_0";
+    createServer(server1);
+    createServer(server2);
+    final String host = VM.getHostName();
+    final int port = server2.invoke(() -> getCacheServerPort());
+    createClient(client, port, host);
+    createCQ(client, cqName, cqs[0]);
+
+    int numObjects = 1000;
+
+    server1.invoke(() -> {
+      Region<String, Object> region =
+          getCache().getRegion(SEPARATOR + "root" + SEPARATOR + regions[0]);
+      Map<String, Object> buffer = new HashMap();
+      for (int i = 1; i < numObjects; i++) {
+        Portfolio p = new Portfolio(i);
+        buffer.put("" + i, p);
+      }
+      region.putAll(buffer);
+      assertThat(region.size()).isEqualTo(numObjects - 1);
+    });
+
+    client.invoke(() -> {
+      QueryService cqService = getCache().getQueryService();
+      CqQuery cqQuery = cqService.getCq(cqName);
+      assertThat(cqQuery)
+          .withFailMessage("Failed to get CQ " + cqName)
+          .isNotNull();
+      cqQuery.executeWithInitialResults();
+    });
+
+    server1.invoke(() -> {
+      Region<String, Object> region =
+          getCache().getRegion(SEPARATOR + "root" + SEPARATOR + regions[0]);
+      Set<String> keys = new HashSet<>();
+      for (int i = 1; i < numObjects; i++) {
+        keys.add("" + i);
+      }
+      // This is to generate LOCAL_DESTROY CQ event
+      region.removeAll(keys);
+      assertThat(region.size()).isEqualTo(0);
+    });
+
+    client.invoke(() -> {
+      QueryService cqService = getCache().getQueryService();
+      CqQuery cqQuery = cqService.getCq(cqName);
+      assertThat(cqQuery)
+          .withFailMessage("Failed to get CQ " + cqName)
+          .isNotNull();
+      CqQueryTestListener cqListener =
+          (CqQueryTestListener) cqQuery.getCqAttributes().getCqListener();
+      assertThat(cqListener.getTotalEventCount()).isEqualTo(numObjects - 1);
+    });
+
+    cqHelper.closeClient(client);
+    cqHelper.closeServer(server2);
+    cqHelper.closeServer(server1);
+  }
+
+  @Test
   public void testCQLeakWithPartitionedRegion() throws Exception {
     // creating servers.
     final Host host = Host.getHost(0);