You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by jc...@apache.org on 2021/03/11 19:20:54 UTC
[geode] branch develop updated: GEODE-9016: Fix the NPE for PutAll
with CQ LOCAL_DESTROY message type (#6104)
This is an automated email from the ASF dual-hosted git repository.
jchen21 pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 4e0c8aa GEODE-9016: Fix the NPE for PutAll with CQ LOCAL_DESTROY message type (#6104)
4e0c8aa is described below
commit 4e0c8aa6937ad2b5935a11994138381fa29a8644
Author: Jianxia Chen <11...@users.noreply.github.com>
AuthorDate: Thu Mar 11 11:19:39 2021 -0800
GEODE-9016: Fix the NPE for PutAll with CQ LOCAL_DESTROY message type (#6104)
For PutAll and RemoveAll, when removing destroy token from CQ result keys, use the keys in the individual entry events, instead of using the key in the base event.
---
.../internal/cache/DistributedCacheOperation.java | 25 ++--
.../internal/cache/DistributedPutAllOperation.java | 20 +++
.../cache/DistributedRemoveAllOperation.java | 20 +++
.../cache/DistributedCacheOperationTest.java | 24 ++++
.../cache/DistributedPutAllOperationTest.java | 51 ++++++++
.../cache/DistributedRemoveAllOperationTest.java | 51 ++++++++
.../dunit/PartitionedRegionCqQueryDUnitTest.java | 136 +++++++++++++++++++++
7 files changed, 317 insertions(+), 10 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
index 3e55045..b2f2767 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
@@ -719,7 +719,7 @@ public abstract class DistributedCacheOperation {
private void removeDestroyTokensFromCqResultKeys(FilterRoutingInfo filterRouting) {
for (InternalDistributedMember m : filterRouting.getMembers()) {
FilterInfo filterInfo = filterRouting.getFilterInfo(m);
- if (filterInfo.getCQs() == null) {
+ if (filterInfo == null || filterInfo.getCQs() == null) {
continue;
}
@@ -734,15 +734,20 @@ public abstract class DistributedCacheOperation {
for (Object value : cf.filterProfile.getCqMap().values()) {
ServerCQ cq = (ServerCQ) value;
- for (Map.Entry<Long, Integer> e : filterInfo.getCQs().entrySet()) {
- Long cqID = e.getKey();
- // For the CQs satisfying the event with destroy CQEvent, remove
- // the entry form CQ cache.
- if (cq != null && cq.getFilterID() != null && cq.getFilterID().equals(cqID)
- && (e.getValue().equals(MessageType.LOCAL_DESTROY))) {
- cq.removeFromCqResultKeys(((EntryOperation) event).getKey(), true);
- }
- }
+ doRemoveDestroyTokensFromCqResultKeys(filterInfo, cq);
+ }
+ }
+ }
+
+ void doRemoveDestroyTokensFromCqResultKeys(FilterInfo filterInfo, ServerCQ cq) {
+ for (Map.Entry<Long, Integer> e : filterInfo.getCQs().entrySet()) {
+ Long cqID = e.getKey();
+ // For the CQs satisfying the event with destroy CQEvent, remove
+ // the entry form CQ cache.
+ if (cq != null && cq.getFilterID() != null && cq.getFilterID().equals(cqID)
+ && e.getValue() != null && e.getValue().equals(MessageType.LOCAL_DESTROY)
+ && ((EntryOperation) event).getKey() != null) {
+ cq.removeFromCqResultKeys(((EntryOperation) event).getKey(), true);
}
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java
index 9a9737e..842d7cd 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java
@@ -25,6 +25,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
@@ -39,6 +40,7 @@ import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.persistence.PersistentReplicatesOfflineException;
import org.apache.geode.cache.query.internal.cq.CqService;
+import org.apache.geode.cache.query.internal.cq.ServerCQ;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.DirectReplyProcessor;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
@@ -46,6 +48,7 @@ import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo;
import org.apache.geode.internal.cache.ha.ThreadIdentifier;
import org.apache.geode.internal.cache.partitioned.PutAllPRMessage;
+import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
import org.apache.geode.internal.cache.tx.RemotePutAllMessage;
@@ -814,6 +817,23 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation {
return consolidated;
}
+ @Override
+ void doRemoveDestroyTokensFromCqResultKeys(FilterInfo filterInfo, ServerCQ cq) {
+ for (Map.Entry<Long, Integer> e : filterInfo.getCQs().entrySet()) {
+ Long cqID = e.getKey();
+ // For the CQs satisfying the event with destroy CQEvent, remove
+ // the entry from CQ cache.
+ for (int i = 0; i < this.putAllData.length; i++) {
+ @Unretained
+ EntryEventImpl entryEvent = getEventForPosition(i);
+ if (entryEvent != null && entryEvent.getKey() != null && cq != null
+ && cq.getFilterID() != null && cq.getFilterID().equals(cqID)
+ && e.getValue() != null && e.getValue().equals(MessageType.LOCAL_DESTROY)) {
+ cq.removeFromCqResultKeys(entryEvent.getKey(), true);
+ }
+ }
+ }
+ }
@Override
protected FilterInfo getLocalFilterRouting(FilterRoutingInfo frInfo) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java
index f76c701..942a4dd 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.Map;
import java.util.Set;
import org.apache.logging.log4j.Logger;
@@ -33,6 +34,7 @@ import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.persistence.PersistentReplicatesOfflineException;
import org.apache.geode.cache.query.internal.cq.CqService;
+import org.apache.geode.cache.query.internal.cq.ServerCQ;
import org.apache.geode.distributed.internal.DirectReplyProcessor;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.InternalDataSerializer;
@@ -40,6 +42,7 @@ import org.apache.geode.internal.cache.DistributedPutAllOperation.EntryVersionsL
import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo;
import org.apache.geode.internal.cache.ha.ThreadIdentifier;
import org.apache.geode.internal.cache.partitioned.RemoveAllPRMessage;
+import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
import org.apache.geode.internal.cache.tx.RemoteRemoveAllMessage;
@@ -584,6 +587,23 @@ public class DistributedRemoveAllOperation extends AbstractUpdateOperation {
return consolidated;
}
+ @Override
+ void doRemoveDestroyTokensFromCqResultKeys(FilterInfo filterInfo, ServerCQ cq) {
+ for (Map.Entry<Long, Integer> e : filterInfo.getCQs().entrySet()) {
+ Long cqID = e.getKey();
+ // For the CQs satisfying the event with destroy CQEvent, remove
+ // the entry from CQ cache.
+ for (int i = 0; i < this.removeAllData.length; i++) {
+ @Unretained
+ EntryEventImpl entryEvent = getEventForPosition(i);
+ if (entryEvent != null && entryEvent.getKey() != null && cq != null
+ && cq.getFilterID() != null && cq.getFilterID().equals(cqID)
+ && e.getValue() != null && e.getValue().equals(MessageType.LOCAL_DESTROY)) {
+ cq.removeFromCqResultKeys(entryEvent.getKey(), true);
+ }
+ }
+ }
+ }
@Override
protected FilterInfo getLocalFilterRouting(FilterRoutingInfo frInfo) {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java
index a97fecc..606fac6 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java
@@ -17,6 +17,8 @@ package org.apache.geode.internal.cache;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -28,9 +30,11 @@ import java.util.Map;
import org.junit.Test;
import org.apache.geode.cache.CacheEvent;
+import org.apache.geode.cache.query.internal.cq.ServerCQ;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.DistributedCacheOperation.CacheOperationMessage;
import org.apache.geode.internal.cache.persistence.PersistentMemberID;
+import org.apache.geode.internal.cache.tier.MessageType;
public class DistributedCacheOperationTest {
@@ -106,4 +110,24 @@ public class DistributedCacheOperationTest {
throw new RuntimeException("boom");
}
}
+
+ @Test
+ public void testDoRemoveDestroyTokensFromCqResultKeys() {
+ Object key = new Object();
+ HashMap hashMap = new HashMap();
+ hashMap.put(1L, MessageType.LOCAL_DESTROY);
+ EntryEventImpl baseEvent = mock(EntryEventImpl.class);
+ ServerCQ serverCQ = mock(ServerCQ.class);
+ FilterRoutingInfo.FilterInfo filterInfo = mock(FilterRoutingInfo.FilterInfo.class);
+ DistributedCacheOperation distributedCacheOperation =
+ new DestroyOperation(baseEvent);
+ when(baseEvent.getKey()).thenReturn(key);
+ when(filterInfo.getCQs()).thenReturn(hashMap);
+ when(serverCQ.getFilterID()).thenReturn(new Long(1L));
+ doNothing().when(serverCQ).removeFromCqResultKeys(isA(Object.class), isA(Boolean.class));
+
+ distributedCacheOperation.doRemoveDestroyTokensFromCqResultKeys(filterInfo, serverCQ);
+
+ verify(serverCQ, times(1)).removeFromCqResultKeys(key, true);
+ }
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedPutAllOperationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedPutAllOperationTest.java
index a06920e..da0c1cf 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedPutAllOperationTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedPutAllOperationTest.java
@@ -15,11 +15,25 @@
package org.apache.geode.internal.cache;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.HashMap;
+
import org.junit.Test;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.query.internal.cq.CqService;
+import org.apache.geode.cache.query.internal.cq.ServerCQ;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.cache.tier.MessageType;
+
public class DistributedPutAllOperationTest {
@@ -33,4 +47,41 @@ public class DistributedPutAllOperationTest {
assertThat(mockDistributedPutAllOperation.getBaseEvent()).isSameAs(mockEntryEventImpl);
}
+
+ @Test
+ public void testDoRemoveDestroyTokensFromCqResultKeys() {
+ EntryEventImpl baseEvent = mock(EntryEventImpl.class);
+ EntryEventImpl entryEvent = mock(EntryEventImpl.class);
+ BucketRegion bucketRegion = mock(BucketRegion.class);
+ InternalCache internalCache = mock(InternalCache.class);
+ RegionAttributes regionAttributes = mock(RegionAttributes.class);
+ FilterRoutingInfo.FilterInfo filterInfo = mock(FilterRoutingInfo.FilterInfo.class);
+ CqService cqService = mock(CqService.class);
+ InternalDistributedSystem internalDistributedSystem = mock(InternalDistributedSystem.class);
+ PartitionedRegion partitionedRegion = mock(PartitionedRegion.class);
+ ServerCQ serverCQ = mock(ServerCQ.class);
+ int putAllPRDataSize = 1;
+ DistributedPutAllOperation distributedPutAllOperation =
+ new DistributedPutAllOperation(baseEvent, putAllPRDataSize, false);
+ Object key = new Object();
+ when(entryEvent.getKey()).thenReturn(key);
+ distributedPutAllOperation.addEntry(entryEvent);
+ HashMap hashMap = new HashMap();
+ hashMap.put(1L, MessageType.LOCAL_DESTROY);
+ when(filterInfo.getCQs()).thenReturn(hashMap);
+ when(baseEvent.getRegion()).thenReturn(bucketRegion);
+ when(bucketRegion.getAttributes()).thenReturn(regionAttributes);
+ when(regionAttributes.getDataPolicy()).thenReturn(DataPolicy.DEFAULT);
+ when(bucketRegion.getCache()).thenReturn(internalCache);
+ when(bucketRegion.getPartitionedRegion()).thenReturn(partitionedRegion);
+ when(bucketRegion.getKeyInfo(any(), any(), any())).thenReturn(new KeyInfo(key, null, null));
+ when(internalCache.getDistributedSystem()).thenReturn(internalDistributedSystem);
+ when(internalCache.getCqService()).thenReturn(cqService);
+ when(serverCQ.getFilterID()).thenReturn(new Long(1L));
+ doNothing().when(serverCQ).removeFromCqResultKeys(isA(Object.class), isA(Boolean.class));
+
+ distributedPutAllOperation.doRemoveDestroyTokensFromCqResultKeys(filterInfo, serverCQ);
+
+ verify(serverCQ, times(1)).removeFromCqResultKeys(key, true);
+ }
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRemoveAllOperationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRemoveAllOperationTest.java
index bd84116..7f2eb46 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRemoveAllOperationTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRemoveAllOperationTest.java
@@ -15,11 +15,25 @@
package org.apache.geode.internal.cache;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.HashMap;
+
import org.junit.Test;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.query.internal.cq.CqService;
+import org.apache.geode.cache.query.internal.cq.ServerCQ;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.cache.tier.MessageType;
+
public class DistributedRemoveAllOperationTest {
@@ -33,4 +47,41 @@ public class DistributedRemoveAllOperationTest {
assertThat(mockDistributedRemoveAllOperation.getBaseEvent()).isSameAs(mockEntryEventImpl);
}
+
+ @Test
+ public void testDoRemoveDestroyTokensFromCqResultKeys() {
+ EntryEventImpl baseEvent = mock(EntryEventImpl.class);
+ EntryEventImpl entryEvent = mock(EntryEventImpl.class);
+ BucketRegion bucketRegion = mock(BucketRegion.class);
+ InternalCache internalCache = mock(InternalCache.class);
+ RegionAttributes regionAttributes = mock(RegionAttributes.class);
+ InternalDistributedSystem internalDistributedSystem = mock(InternalDistributedSystem.class);
+ FilterRoutingInfo.FilterInfo filterInfo = mock(FilterRoutingInfo.FilterInfo.class);
+ CqService cqService = mock(CqService.class);
+ PartitionedRegion partitionedRegion = mock(PartitionedRegion.class);
+ ServerCQ serverCQ = mock(ServerCQ.class);
+ int removeAllPRDataSize = 1;
+ DistributedRemoveAllOperation distributedRemoveAllOperation =
+ new DistributedRemoveAllOperation(baseEvent, removeAllPRDataSize, false);
+ Object key = new Object();
+ when(entryEvent.getKey()).thenReturn(key);
+ distributedRemoveAllOperation.addEntry(entryEvent);
+ HashMap hashMap = new HashMap();
+ hashMap.put(1L, MessageType.LOCAL_DESTROY);
+ when(filterInfo.getCQs()).thenReturn(hashMap);
+ when(baseEvent.getRegion()).thenReturn(bucketRegion);
+ when(bucketRegion.getAttributes()).thenReturn(regionAttributes);
+ when(bucketRegion.getPartitionedRegion()).thenReturn(partitionedRegion);
+ when(bucketRegion.getCache()).thenReturn(internalCache);
+ when(bucketRegion.getKeyInfo(any(), any(), any())).thenReturn(new KeyInfo(key, null, null));
+ when(regionAttributes.getDataPolicy()).thenReturn(DataPolicy.DEFAULT);
+ when(internalCache.getDistributedSystem()).thenReturn(internalDistributedSystem);
+ when(internalCache.getCqService()).thenReturn(cqService);
+ when(serverCQ.getFilterID()).thenReturn(new Long(1L));
+ doNothing().when(serverCQ).removeFromCqResultKeys(isA(Object.class), isA(Boolean.class));
+
+ distributedRemoveAllOperation.doRemoveDestroyTokensFromCqResultKeys(filterInfo, serverCQ);
+
+ verify(serverCQ, times(1)).removeFromCqResultKeys(key, true);
+ }
}
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionCqQueryDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionCqQueryDUnitTest.java
index 3f520f6..6d2e979 100644
--- a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionCqQueryDUnitTest.java
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionCqQueryDUnitTest.java
@@ -22,9 +22,14 @@ import static org.apache.geode.test.dunit.Assert.assertNotNull;
import static org.apache.geode.test.dunit.Assert.assertNull;
import static org.apache.geode.test.dunit.Assert.assertTrue;
import static org.apache.geode.test.dunit.Assert.fail;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -121,6 +126,137 @@ public class PartitionedRegionCqQueryDUnitTest extends JUnit4CacheTestCase {
private static int bridgeServerPort;
@Test
+ public void testPutAllWithCQLocalDestroy() {
+ VM server1 = getVM(0);
+ VM server2 = getVM(1);
+ VM client = getVM(2);
+
+ final String cqName = "testPutAllWithCQLocalDestroy_0";
+ createServer(server1);
+ createServer(server2);
+ final String host = VM.getHostName();
+ final int port = server2.invoke(() -> getCacheServerPort());
+ createClient(client, port, host);
+ createCQ(client, cqName, cqs[0]);
+
+ int numObjects = 1000;
+
+ server1.invoke(() -> {
+ Region<String, Object> region =
+ getCache().getRegion(SEPARATOR + "root" + SEPARATOR + regions[0]);
+ Map<String, Object> buffer = new HashMap();
+ for (int i = 1; i < numObjects; i++) {
+ Portfolio p = new Portfolio(i);
+ buffer.put("" + i, p);
+ }
+ region.putAll(buffer);
+ assertThat(region.size()).isEqualTo(numObjects - 1);
+ });
+
+ client.invoke(() -> {
+ QueryService cqService = getCache().getQueryService();
+ CqQuery cqQuery = cqService.getCq(cqName);
+ assertThat(cqQuery)
+ .withFailMessage("Failed to get CQ " + cqName)
+ .isNotNull();
+ cqQuery.executeWithInitialResults();
+ });
+
+ server1.invoke(() -> {
+ Region<String, Object> region =
+ getCache().getRegion(SEPARATOR + "root" + SEPARATOR + regions[0]);
+ // PutAll with entries that do not satisfy CQ. This is to generate LOCAL_DESTROY CQ event
+ Map<String, Object> buffer = new HashMap();
+ for (int i = 1; i < numObjects; i++) {
+ Portfolio p = new Portfolio(-1 * i);
+ buffer.put("" + i, p);
+ }
+ region.putAll(buffer);
+ assertThat(region.size()).isEqualTo(numObjects - 1);
+ });
+
+ client.invoke(() -> {
+ QueryService cqService = getCache().getQueryService();
+ CqQuery cqQuery = cqService.getCq(cqName);
+ assertThat(cqQuery)
+ .withFailMessage("Failed to get CQ " + cqName)
+ .isNotNull();
+ CqQueryTestListener cqListener =
+ (CqQueryTestListener) cqQuery.getCqAttributes().getCqListener();
+ assertThat(cqListener.getTotalEventCount()).isEqualTo(numObjects - 1);
+ });
+
+ cqHelper.closeClient(client);
+ cqHelper.closeServer(server2);
+ cqHelper.closeServer(server1);
+ }
+
+ @Test
+ public void testRemoveAllWithCQLocalDestroy() {
+ VM server1 = getVM(0);
+ VM server2 = getVM(1);
+ VM client = getVM(2);
+
+ final String cqName = "testRemoveAllWithCQLocalDestroy_0";
+ createServer(server1);
+ createServer(server2);
+ final String host = VM.getHostName();
+ final int port = server2.invoke(() -> getCacheServerPort());
+ createClient(client, port, host);
+ createCQ(client, cqName, cqs[0]);
+
+ int numObjects = 1000;
+
+ server1.invoke(() -> {
+ Region<String, Object> region =
+ getCache().getRegion(SEPARATOR + "root" + SEPARATOR + regions[0]);
+ Map<String, Object> buffer = new HashMap();
+ for (int i = 1; i < numObjects; i++) {
+ Portfolio p = new Portfolio(i);
+ buffer.put("" + i, p);
+ }
+ region.putAll(buffer);
+ assertThat(region.size()).isEqualTo(numObjects - 1);
+ });
+
+ client.invoke(() -> {
+ QueryService cqService = getCache().getQueryService();
+ CqQuery cqQuery = cqService.getCq(cqName);
+ assertThat(cqQuery)
+ .withFailMessage("Failed to get CQ " + cqName)
+ .isNotNull();
+ cqQuery.executeWithInitialResults();
+ });
+
+ server1.invoke(() -> {
+ Region<String, Object> region =
+ getCache().getRegion(SEPARATOR + "root" + SEPARATOR + regions[0]);
+ Set<String> keys = new HashSet<>();
+ for (int i = 1; i < numObjects; i++) {
+ keys.add("" + i);
+ }
+ // This is to generate LOCAL_DESTROY CQ event
+ region.removeAll(keys);
+ assertThat(region.size()).isEqualTo(0);
+ });
+
+ client.invoke(() -> {
+ QueryService cqService = getCache().getQueryService();
+ CqQuery cqQuery = cqService.getCq(cqName);
+ assertThat(cqQuery)
+ .withFailMessage("Failed to get CQ " + cqName)
+ .isNotNull();
+ CqQueryTestListener cqListener =
+ (CqQueryTestListener) cqQuery.getCqAttributes().getCqListener();
+ assertThat(cqListener.getTotalEventCount()).isEqualTo(numObjects - 1);
+ });
+
+ cqHelper.closeClient(client);
+ cqHelper.closeServer(server2);
+ cqHelper.closeServer(server1);
+ }
+
+ @Test
public void testCQLeakWithPartitionedRegion() throws Exception {
// creating servers.
final Host host = Host.getHost(0);