You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2022/04/28 18:58:55 UTC
[geode] branch develop updated: GEODE-10260: make sure message is added before they are processed. (#7628)
This is an automated email from the ASF dual-hosted git repository.
jinmeiliao pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new f117a59548 GEODE-10260: make sure message is added before they are processed. (#7628)
f117a59548 is described below
commit f117a59548fe392725616bf9f354748e1ba785d9
Author: Jinmei Liao <ji...@pivotal.io>
AuthorDate: Thu Apr 28 11:58:49 2022 -0700
GEODE-10260: make sure message is added before they are processed. (#7628)
---
.../apache/geode/internal/cache/FilterProfile.java | 31 ++++++----
.../geode/cache/query/cq/CQDistributedTest.java | 71 ++++++++++++++++++----
2 files changed, 78 insertions(+), 24 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java
index 3ae0185302..ed52e24190 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java
@@ -1846,18 +1846,27 @@ public class FilterProfile implements DataSerializableFixedID {
}
CacheDistributionAdvisor cda = (CacheDistributionAdvisor) r.getDistributionAdvisor();
- CacheDistributionAdvisor.CacheProfile cp =
- (CacheDistributionAdvisor.CacheProfile) cda.getProfile(getSender());
- if (cp == null) { // PR accessors do not keep filter profiles around
- if (logger.isDebugEnabled()) {
- logger.debug(
- "No cache profile to update, adding filter profile message to queue. Message :{}",
- this);
+ CacheDistributionAdvisor.CacheProfile cp;
+
+ // prevent adding the message to queue after we have processed the queue
+ // in CreateRegionReplyProcessor.process
+ synchronized (cda) {
+ cp = (CacheDistributionAdvisor.CacheProfile) cda.getProfile(getSender());
+ if (cp == null) {
+ // only need to hold the lock if cda doesn't have the profile yet. This makes sure
+ // we add the message to the queue before they are processed
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "No cache profile to update, adding filter profile message to queue. Message :{}",
+ this);
+ }
+ FilterProfile localFP = ((LocalRegion) r).getFilterProfile();
+ localFP.addToFilterProfileQueue(getSender(), this);
+ dm.getCancelCriterion().checkCancelInProgress(null);
}
- FilterProfile localFP = ((LocalRegion) r).getFilterProfile();
- localFP.addToFilterProfileQueue(getSender(), this);
- dm.getCancelCriterion().checkCancelInProgress(null);
- } else {
+ }
+
+ if (cp != null) {
cp.hasCacheServer = true;
FilterProfile fp = cp.filterProfile;
if (fp == null) { // PR accessors do not keep filter profiles around
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/CQDistributedTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/CQDistributedTest.java
index 4bb7f2fa21..8c35047951 100644
--- a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/CQDistributedTest.java
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/CQDistributedTest.java
@@ -21,6 +21,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.io.Serializable;
import java.util.Properties;
+import java.util.concurrent.Future;
import org.junit.Before;
import org.junit.Rule;
@@ -41,9 +42,11 @@ import org.apache.geode.cache.query.CqListener;
import org.apache.geode.cache.query.QueryService;
import org.apache.geode.cache.query.SelectResults;
import org.apache.geode.cache.query.data.Portfolio;
+import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.rules.ClusterStartupRule;
import org.apache.geode.test.dunit.rules.MemberVM;
import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
@Category({ClientSubscriptionTest.class})
@@ -58,24 +61,26 @@ public class CQDistributedTest implements Serializable {
private TestCqListener testListener;
private TestCqListener2 testListener2;
- private Region region;
+ private Region<Integer, Portfolio> region;
@Rule
public ClusterStartupRule clusterStartupRule = new ClusterStartupRule();
@Before
public void before() throws Exception {
- locator = clusterStartupRule.startLocatorVM(1, new Properties());
+ locator = clusterStartupRule.startLocatorVM(0, new Properties());
Integer locator1Port = locator.getPort();
- server = clusterStartupRule.startServerVM(3, locator1Port);
+ server = clusterStartupRule.startServerVM(1, locator1Port);
createServerRegion(server, RegionShortcut.PARTITION);
- server2 = clusterStartupRule.startServerVM(4, locator1Port);
+ server2 = clusterStartupRule.startServerVM(2, locator1Port);
createServerRegion(server2, RegionShortcut.PARTITION);
ClientCache clientCache = createClientCache(locator1Port);
region =
- clientCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("region");
+ clientCache
+ .<Integer, Portfolio>createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY)
+ .create("region");
qs = clientCache.getQueryService();
CqAttributesFactory cqaf = new CqAttributesFactory();
@@ -87,12 +92,52 @@ public class CQDistributedTest implements Serializable {
cqa = cqaf.create();
}
+ @Rule
+ public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+ @Test
+ // Before the fix, this test will reproduce pretty consistently if we put a sleep statement before
+ // we do localFP.addToFilterProfileQueue in FilterProfile$OperationMessage.process().
+ public void filterProfileUpdate() throws Exception {
+ MemberVM newServer = clusterStartupRule.startServerVM(3, locator.getPort());
+
+ // create 10 cqs to begin with
+ for (int i = 0; i < 10; i++) {
+ qs.newCq("query_" + i, "Select * from " + SEPARATOR + "region r where r.ID = " + i, cqa)
+ .execute();
+ }
+
+ AsyncInvocation regionCreate = newServer.invokeAsync(() -> {
+ ClusterStartupRule.memberStarter.createRegion(RegionShortcut.PARTITION, "region");
+ });
+
+ Future<Void> createCqs = executor.submit(() -> {
+ for (int i = 10; i < 100; i++) {
+ qs.newCq("query_" + i, "Select * from " + SEPARATOR + "region r where r.ID = " + i, cqa)
+ .execute();
+ }
+ });
+
+ regionCreate.await();
+ createCqs.get();
+
+ newServer.invoke(() -> {
+ Region<Integer, Portfolio> regionOnServer = ClusterStartupRule.getCache().getRegion("region");
+ for (int i = 0; i < 100; i++) {
+ regionOnServer.put(i, new Portfolio(i));
+ }
+ });
+
+ // make sure all cq's will get its own event, so total events = total # of cqs.
+ await().untilAsserted(() -> assertThat(testListener.onEventCalls).isEqualTo(100));
+ }
+
@Test
public void cqUsingModShouldFireEventsWhenFilterCriteriaIsMet() throws Exception {
qs.newCq("Select * from " + SEPARATOR + "region r where r.ID % 2 = 1", cqa).execute();
server.invoke(() -> {
- Region regionOnServer = ClusterStartupRule.getCache().getRegion("region");
+ Region<Integer, Portfolio> regionOnServer = ClusterStartupRule.getCache().getRegion("region");
regionOnServer.put(0, new Portfolio(0));
regionOnServer.put(1, new Portfolio(1));
regionOnServer.put(2, new Portfolio(2));
@@ -108,7 +153,7 @@ public class CQDistributedTest implements Serializable {
public void cqUsingPlusShouldFireEventsWhenFilterCriteriaIsMet() throws Exception {
qs.newCq("Select * from " + SEPARATOR + "region r where r.ID + 3 > 4", cqa).execute();
server.invoke(() -> {
- Region regionOnServer = ClusterStartupRule.getCache().getRegion("region");
+ Region<Integer, Portfolio> regionOnServer = ClusterStartupRule.getCache().getRegion("region");
regionOnServer.put(0, new Portfolio(0));
regionOnServer.put(1, new Portfolio(1));
regionOnServer.put(2, new Portfolio(2));
@@ -124,7 +169,7 @@ public class CQDistributedTest implements Serializable {
public void cqUsingSubtractShouldFireEventsWhenFilterCriteriaIsMet() throws Exception {
qs.newCq("Select * from " + SEPARATOR + "region r where r.ID - 3 < 0", cqa).execute();
server.invoke(() -> {
- Region regionOnServer = ClusterStartupRule.getCache().getRegion("region");
+ Region<Integer, Portfolio> regionOnServer = ClusterStartupRule.getCache().getRegion("region");
regionOnServer.put(0, new Portfolio(0));
regionOnServer.put(1, new Portfolio(1));
regionOnServer.put(2, new Portfolio(2));
@@ -140,7 +185,7 @@ public class CQDistributedTest implements Serializable {
public void cqUsingDivideShouldFireEventsWhenFilterCriteriaIsMet() throws Exception {
qs.newCq("Select * from " + SEPARATOR + "region r where r.ID / 2 = 1", cqa).execute();
server.invoke(() -> {
- Region regionOnServer = ClusterStartupRule.getCache().getRegion("region");
+ Region<Integer, Portfolio> regionOnServer = ClusterStartupRule.getCache().getRegion("region");
regionOnServer.put(0, new Portfolio(0));
regionOnServer.put(1, new Portfolio(1));
regionOnServer.put(2, new Portfolio(2));
@@ -156,7 +201,7 @@ public class CQDistributedTest implements Serializable {
public void cqUsingMultiplyShouldFireEventsWhenFilterCriteriaIsMet() throws Exception {
qs.newCq("Select * from " + SEPARATOR + "region r where r.ID * 2 > 3", cqa).execute();
server.invoke(() -> {
- Region regionOnServer = ClusterStartupRule.getCache().getRegion("region");
+ Region<Integer, Portfolio> regionOnServer = ClusterStartupRule.getCache().getRegion("region");
regionOnServer.put(0, new Portfolio(0));
regionOnServer.put(1, new Portfolio(1));
regionOnServer.put(2, new Portfolio(2));
@@ -172,7 +217,7 @@ public class CQDistributedTest implements Serializable {
public void cqExecuteWithInitialResultsWithValuesMatchingPrimaryKeyShouldNotThrowClassCastException()
throws Exception {
server.invoke(() -> {
- Region regionOnServer = ClusterStartupRule.getCache().getRegion("region");
+ Region<Integer, Portfolio> regionOnServer = ClusterStartupRule.getCache().getRegion("region");
ClusterStartupRule.getCache().getQueryService().createKeyIndex("PrimaryKeyIndex", "ID",
SEPARATOR + "region");
regionOnServer.put(0, new Portfolio(0));
@@ -193,7 +238,7 @@ public class CQDistributedTest implements Serializable {
qs.newCq("Select * from /region r where r.ID = 1", cqa).execute();
server.invoke(() -> {
- Region regionOnServer = ClusterStartupRule.getCache().getRegion("region");
+ Region<Integer, Portfolio> regionOnServer = ClusterStartupRule.getCache().getRegion("region");
final CacheTransactionManager txMgr =
ClusterStartupRule.getCache().getCacheTransactionManager();
@@ -222,7 +267,7 @@ public class CQDistributedTest implements Serializable {
qs.newCq("Select * from /region r where r.ID = 1", cqa).execute();
server.invoke(() -> {
- Region regionOnServer = ClusterStartupRule.getCache().getRegion("region");
+ Region<Integer, Portfolio> regionOnServer = ClusterStartupRule.getCache().getRegion("region");
// CREATE new entry
regionOnServer.put(0, new Portfolio(1));