You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2022/04/28 18:58:55 UTC

[geode] branch develop updated: GEODE-10260: make sure message is added before they are processed. (#7628)

This is an automated email from the ASF dual-hosted git repository.

jinmeiliao pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new f117a59548 GEODE-10260: make sure message is added before they are processed. (#7628)
f117a59548 is described below

commit f117a59548fe392725616bf9f354748e1ba785d9
Author: Jinmei Liao <ji...@pivotal.io>
AuthorDate: Thu Apr 28 11:58:49 2022 -0700

    GEODE-10260: make sure message is added before they are processed. (#7628)
---
 .../apache/geode/internal/cache/FilterProfile.java | 31 ++++++----
 .../geode/cache/query/cq/CQDistributedTest.java    | 71 ++++++++++++++++++----
 2 files changed, 78 insertions(+), 24 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java
index 3ae0185302..ed52e24190 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java
@@ -1846,18 +1846,27 @@ public class FilterProfile implements DataSerializableFixedID {
         }
 
         CacheDistributionAdvisor cda = (CacheDistributionAdvisor) r.getDistributionAdvisor();
-        CacheDistributionAdvisor.CacheProfile cp =
-            (CacheDistributionAdvisor.CacheProfile) cda.getProfile(getSender());
-        if (cp == null) { // PR accessors do not keep filter profiles around
-          if (logger.isDebugEnabled()) {
-            logger.debug(
-                "No cache profile to update, adding filter profile message to queue. Message :{}",
-                this);
+        CacheDistributionAdvisor.CacheProfile cp;
+
+        // prevent adding the message to queue after we have processed the queue
+        // in CreateRegionReplyProcessor.process
+        synchronized (cda) {
+          cp = (CacheDistributionAdvisor.CacheProfile) cda.getProfile(getSender());
+          if (cp == null) {
+            // only need to hold the lock if cda doesn't have the profile yet. This makes sure
+            // we add the message to the queue before they are processed
+            if (logger.isDebugEnabled()) {
+              logger.debug(
+                  "No cache profile to update, adding filter profile message to queue. Message :{}",
+                  this);
+            }
+            FilterProfile localFP = ((LocalRegion) r).getFilterProfile();
+            localFP.addToFilterProfileQueue(getSender(), this);
+            dm.getCancelCriterion().checkCancelInProgress(null);
           }
-          FilterProfile localFP = ((LocalRegion) r).getFilterProfile();
-          localFP.addToFilterProfileQueue(getSender(), this);
-          dm.getCancelCriterion().checkCancelInProgress(null);
-        } else {
+        }
+
+        if (cp != null) {
           cp.hasCacheServer = true;
           FilterProfile fp = cp.filterProfile;
           if (fp == null) { // PR accessors do not keep filter profiles around
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/CQDistributedTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/CQDistributedTest.java
index 4bb7f2fa21..8c35047951 100644
--- a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/CQDistributedTest.java
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/CQDistributedTest.java
@@ -21,6 +21,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.Serializable;
 import java.util.Properties;
+import java.util.concurrent.Future;
 
 import org.junit.Before;
 import org.junit.Rule;
@@ -41,9 +42,11 @@ import org.apache.geode.cache.query.CqListener;
 import org.apache.geode.cache.query.QueryService;
 import org.apache.geode.cache.query.SelectResults;
 import org.apache.geode.cache.query.data.Portfolio;
+import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.rules.ClusterStartupRule;
 import org.apache.geode.test.dunit.rules.MemberVM;
 import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
 
 
 @Category({ClientSubscriptionTest.class})
@@ -58,24 +61,26 @@ public class CQDistributedTest implements Serializable {
   private TestCqListener testListener;
   private TestCqListener2 testListener2;
 
-  private Region region;
+  private Region<Integer, Portfolio> region;
 
   @Rule
   public ClusterStartupRule clusterStartupRule = new ClusterStartupRule();
 
   @Before
   public void before() throws Exception {
-    locator = clusterStartupRule.startLocatorVM(1, new Properties());
+    locator = clusterStartupRule.startLocatorVM(0, new Properties());
     Integer locator1Port = locator.getPort();
-    server = clusterStartupRule.startServerVM(3, locator1Port);
+    server = clusterStartupRule.startServerVM(1, locator1Port);
     createServerRegion(server, RegionShortcut.PARTITION);
 
-    server2 = clusterStartupRule.startServerVM(4, locator1Port);
+    server2 = clusterStartupRule.startServerVM(2, locator1Port);
     createServerRegion(server2, RegionShortcut.PARTITION);
 
     ClientCache clientCache = createClientCache(locator1Port);
     region =
-        clientCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("region");
+        clientCache
+            .<Integer, Portfolio>createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY)
+            .create("region");
 
     qs = clientCache.getQueryService();
     CqAttributesFactory cqaf = new CqAttributesFactory();
@@ -87,12 +92,52 @@ public class CQDistributedTest implements Serializable {
     cqa = cqaf.create();
   }
 
+  @Rule
+  public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+  @Test
+  // Before the fix, this test will reproduce pretty consistently if we put a sleep statement before
+  // we do localFP.addToFilterProfileQueue in FilterProfile$OperationMessage.process().
+  public void filterProfileUpdate() throws Exception {
+    MemberVM newServer = clusterStartupRule.startServerVM(3, locator.getPort());
+
+    // create 10 cqs to begin with
+    for (int i = 0; i < 10; i++) {
+      qs.newCq("query_" + i, "Select * from " + SEPARATOR + "region r where r.ID = " + i, cqa)
+          .execute();
+    }
+
+    AsyncInvocation regionCreate = newServer.invokeAsync(() -> {
+      ClusterStartupRule.memberStarter.createRegion(RegionShortcut.PARTITION, "region");
+    });
+
+    Future<Void> createCqs = executor.submit(() -> {
+      for (int i = 10; i < 100; i++) {
+        qs.newCq("query_" + i, "Select * from " + SEPARATOR + "region r where r.ID = " + i, cqa)
+            .execute();
+      }
+    });
+
+    regionCreate.await();
+    createCqs.get();
+
+    newServer.invoke(() -> {
+      Region<Integer, Portfolio> regionOnServer = ClusterStartupRule.getCache().getRegion("region");
+      for (int i = 0; i < 100; i++) {
+        regionOnServer.put(i, new Portfolio(i));
+      }
+    });
+
+    // make sure all cq's will get its own event, so total events = total # of cqs.
+    await().untilAsserted(() -> assertThat(testListener.onEventCalls).isEqualTo(100));
+  }
+
   @Test
   public void cqUsingModShouldFireEventsWhenFilterCriteriaIsMet() throws Exception {
     qs.newCq("Select * from " + SEPARATOR + "region r where r.ID % 2 = 1", cqa).execute();
 
     server.invoke(() -> {
-      Region regionOnServer = ClusterStartupRule.getCache().getRegion("region");
+      Region<Integer, Portfolio> regionOnServer = ClusterStartupRule.getCache().getRegion("region");
       regionOnServer.put(0, new Portfolio(0));
       regionOnServer.put(1, new Portfolio(1));
       regionOnServer.put(2, new Portfolio(2));
@@ -108,7 +153,7 @@ public class CQDistributedTest implements Serializable {
   public void cqUsingPlusShouldFireEventsWhenFilterCriteriaIsMet() throws Exception {
     qs.newCq("Select * from " + SEPARATOR + "region r where r.ID + 3 > 4", cqa).execute();
     server.invoke(() -> {
-      Region regionOnServer = ClusterStartupRule.getCache().getRegion("region");
+      Region<Integer, Portfolio> regionOnServer = ClusterStartupRule.getCache().getRegion("region");
       regionOnServer.put(0, new Portfolio(0));
       regionOnServer.put(1, new Portfolio(1));
       regionOnServer.put(2, new Portfolio(2));
@@ -124,7 +169,7 @@ public class CQDistributedTest implements Serializable {
   public void cqUsingSubtractShouldFireEventsWhenFilterCriteriaIsMet() throws Exception {
     qs.newCq("Select * from " + SEPARATOR + "region r where r.ID - 3 < 0", cqa).execute();
     server.invoke(() -> {
-      Region regionOnServer = ClusterStartupRule.getCache().getRegion("region");
+      Region<Integer, Portfolio> regionOnServer = ClusterStartupRule.getCache().getRegion("region");
       regionOnServer.put(0, new Portfolio(0));
       regionOnServer.put(1, new Portfolio(1));
       regionOnServer.put(2, new Portfolio(2));
@@ -140,7 +185,7 @@ public class CQDistributedTest implements Serializable {
   public void cqUsingDivideShouldFireEventsWhenFilterCriteriaIsMet() throws Exception {
     qs.newCq("Select * from " + SEPARATOR + "region r where r.ID / 2 = 1", cqa).execute();
     server.invoke(() -> {
-      Region regionOnServer = ClusterStartupRule.getCache().getRegion("region");
+      Region<Integer, Portfolio> regionOnServer = ClusterStartupRule.getCache().getRegion("region");
       regionOnServer.put(0, new Portfolio(0));
       regionOnServer.put(1, new Portfolio(1));
       regionOnServer.put(2, new Portfolio(2));
@@ -156,7 +201,7 @@ public class CQDistributedTest implements Serializable {
   public void cqUsingMultiplyShouldFireEventsWhenFilterCriteriaIsMet() throws Exception {
     qs.newCq("Select * from " + SEPARATOR + "region r where r.ID * 2 > 3", cqa).execute();
     server.invoke(() -> {
-      Region regionOnServer = ClusterStartupRule.getCache().getRegion("region");
+      Region<Integer, Portfolio> regionOnServer = ClusterStartupRule.getCache().getRegion("region");
       regionOnServer.put(0, new Portfolio(0));
       regionOnServer.put(1, new Portfolio(1));
       regionOnServer.put(2, new Portfolio(2));
@@ -172,7 +217,7 @@ public class CQDistributedTest implements Serializable {
   public void cqExecuteWithInitialResultsWithValuesMatchingPrimaryKeyShouldNotThrowClassCastException()
       throws Exception {
     server.invoke(() -> {
-      Region regionOnServer = ClusterStartupRule.getCache().getRegion("region");
+      Region<Integer, Portfolio> regionOnServer = ClusterStartupRule.getCache().getRegion("region");
       ClusterStartupRule.getCache().getQueryService().createKeyIndex("PrimaryKeyIndex", "ID",
           SEPARATOR + "region");
       regionOnServer.put(0, new Portfolio(0));
@@ -193,7 +238,7 @@ public class CQDistributedTest implements Serializable {
     qs.newCq("Select * from /region r where r.ID = 1", cqa).execute();
 
     server.invoke(() -> {
-      Region regionOnServer = ClusterStartupRule.getCache().getRegion("region");
+      Region<Integer, Portfolio> regionOnServer = ClusterStartupRule.getCache().getRegion("region");
       final CacheTransactionManager txMgr =
           ClusterStartupRule.getCache().getCacheTransactionManager();
 
@@ -222,7 +267,7 @@ public class CQDistributedTest implements Serializable {
     qs.newCq("Select * from /region r where r.ID = 1", cqa).execute();
 
     server.invoke(() -> {
-      Region regionOnServer = ClusterStartupRule.getCache().getRegion("region");
+      Region<Integer, Portfolio> regionOnServer = ClusterStartupRule.getCache().getRegion("region");
       // CREATE new entry
       regionOnServer.put(0, new Portfolio(1));