You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mi...@apache.org on 2020/08/07 10:40:49 UTC

[geode] branch develop updated: GEODE-8292: Added check if key is destroyed in CQResults (#5426)

This is an automated email from the ASF dual-hosted git repository.

mivanac pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 2b86f66  GEODE-8292: Added check if key is destroyed in CQResults (#5426)
2b86f66 is described below

commit 2b86f66a6e2c741e9a825a4d8185d84d53c9fdee
Author: Mario Ivanac <48...@users.noreply.github.com>
AuthorDate: Fri Aug 7 12:39:52 2020 +0200

    GEODE-8292: Added check if key is destroyed in CQResults (#5426)
---
 .../geode/cache/query/internal/cq/ServerCQ.java    |  5 +++
 .../geode/cache/query/cq/CQDistributedTest.java    | 41 ++++++++++++++++++++--
 .../cache/query/cq/internal/CqServiceImpl.java     |  3 +-
 .../cache/query/cq/internal/ServerCQImpl.java      |  8 +++++
 .../query/cq/internal/ServerCQResultsCache.java    |  2 ++
 .../cq/internal/ServerCQResultsCacheNoOpImpl.java  |  5 +++
 .../ServerCQResultsCachePartitionRegionImpl.java   |  5 +++
 .../ServerCQResultsCacheReplicateRegionImpl.java   |  5 +++
 8 files changed, 71 insertions(+), 3 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/ServerCQ.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/ServerCQ.java
index 08bfba3..1f004f9 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/ServerCQ.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/ServerCQ.java
@@ -77,6 +77,11 @@ public interface ServerCQ extends InternalCqQuery {
   boolean isOldValueRequiredForQueryProcessing(Object key);
 
   /**
+   * Returns true if key is in destroy token mode.
+   */
+  boolean isKeyDestroyed(Object key);
+
+  /**
    * Closes the Query. On Client side, sends the cq close request to server. On Server side, takes
    * care of repository cleanup.
    *
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/CQDistributedTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/CQDistributedTest.java
index dd50836..f46b714 100644
--- a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/CQDistributedTest.java
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/CQDistributedTest.java
@@ -51,13 +51,14 @@ public class CQDistributedTest implements Serializable {
 
   private MemberVM locator;
   private MemberVM server;
-  private int locator1Port;
+  private MemberVM server2;
 
   private CqAttributes cqa;
   private QueryService qs;
   private TestCqListener testListener;
   private TestCqListener2 testListener2;
 
+  private Region region;
 
   @Rule
   public ClusterStartupRule clusterStartupRule = new ClusterStartupRule();
@@ -69,8 +70,11 @@ public class CQDistributedTest implements Serializable {
     server = clusterStartupRule.startServerVM(3, locator1Port);
     createServerRegion(server, RegionShortcut.PARTITION);
 
+    server2 = clusterStartupRule.startServerVM(4, locator1Port);
+    createServerRegion(server2, RegionShortcut.PARTITION);
+
     ClientCache clientCache = createClientCache(locator1Port);
-    Region region =
+    region =
         clientCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("region");
 
     qs = clientCache.getQueryService();
@@ -233,6 +237,39 @@ public class CQDistributedTest implements Serializable {
     await().untilAsserted(() -> assertThat(testListener2.onEventUpdateCalls).isEqualTo(0));
   }
 
+  @Test
+  public void cqWithTransaction2Servers() throws Exception {
+
+    qs.newCq("Select * from /region r where r.ID = 1", cqa).execute();
+
+    final CacheTransactionManager txMgr = region.getCache().getCacheTransactionManager();
+
+    // CREATE new entry
+    for (int i = 0; i < 4; i++) {
+      txMgr.begin();
+      region.put(i, new Portfolio(1));
+      txMgr.commit();
+    }
+
+    // UPDATE
+    for (int i = 0; i < 4; i++) {
+      txMgr.begin();
+      region.put(i, new Portfolio(0));
+      txMgr.commit();
+    }
+
+    // CREATE
+    for (int i = 0; i < 4; i++) {
+      txMgr.begin();
+      region.put(i, new Portfolio(1));
+      txMgr.commit();
+    }
+
+    await().untilAsserted(() -> assertThat(testListener2.onEventCreateCalls).isEqualTo(8));
+    await().untilAsserted(() -> assertThat(testListener2.onEventUpdateCalls).isEqualTo(0));
+  }
+
+
   private class TestCqListener implements CqListener, Serializable {
     public int onEventCalls = 0;
 
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/CqServiceImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/CqServiceImpl.java
index bf09bd7..554a49c 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/CqServiceImpl.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/CqServiceImpl.java
@@ -1367,7 +1367,8 @@ public class CqServiceImpl implements CqService {
                 // Partitioned Regions. Once this is added remove the check
                 // with PR region.
                 if (cQuery.isCqResultsCacheInitialized()) {
-                  b_cqResults_oldValue = cQuery.isPartOfCqResult(eventKey);
+                  b_cqResults_oldValue =
+                      (cQuery.isPartOfCqResult(eventKey) && !cQuery.isKeyDestroyed(eventKey));
                   // For PR if not found in cache, apply the query on old value.
                   // Also apply if the query was not executed during cq execute
                   if ((cQuery.isPR || !CqServiceImpl.EXECUTE_QUERY_DURING_INIT)
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQImpl.java
index e48ec2b..2f702ac 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQImpl.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQImpl.java
@@ -333,6 +333,14 @@ public class ServerCQImpl extends CqQueryImpl implements DataSerializable, Serve
     return serverCQResultsCache.isOldValueRequiredForQueryProcessing(key);
   }
 
+  @Override
+  public boolean isKeyDestroyed(Object key) {
+    if (!serverCQResultsCache.contains(key)) {
+      return false;
+    }
+    return serverCQResultsCache.isKeyDestroyed(key);
+  }
+
   /**
    * Closes the Query. On Client side, sends the cq close request to server. On Server side, takes
    * care of repository cleanup.
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCache.java b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCache.java
index 87c9693..02db2be 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCache.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCache.java
@@ -44,5 +44,7 @@ interface ServerCQResultsCache {
 
   boolean isOldValueRequiredForQueryProcessing(Object key);
 
+  boolean isKeyDestroyed(Object key);
+
   void clear();
 }
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheNoOpImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheNoOpImpl.java
index 4f0f1cf..7248d5a 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheNoOpImpl.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheNoOpImpl.java
@@ -64,5 +64,10 @@ class ServerCQResultsCacheNoOpImpl implements ServerCQResultsCache {
   }
 
   @Override
+  public boolean isKeyDestroyed(Object key) {
+    return false;
+  }
+
+  @Override
   public void clear() {}
 }
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCachePartitionRegionImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCachePartitionRegionImpl.java
index 882a923..8f7279c 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCachePartitionRegionImpl.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCachePartitionRegionImpl.java
@@ -127,6 +127,11 @@ class ServerCQResultsCachePartitionRegionImpl implements ServerCQResultsCache {
   }
 
   @Override
+  public boolean isKeyDestroyed(Object key) {
+    return (cqResultKeys.get(key) == Token.DESTROYED);
+  }
+
+  @Override
   public void clear() {
     cqResultKeys.clear();
   }
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheReplicateRegionImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheReplicateRegionImpl.java
index 3d987b8..dd66d03 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheReplicateRegionImpl.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheReplicateRegionImpl.java
@@ -167,6 +167,11 @@ class ServerCQResultsCacheReplicateRegionImpl implements ServerCQResultsCache {
   }
 
   @Override
+  public boolean isKeyDestroyed(Object key) {
+    return false;
+  }
+
+  @Override
   public void clear() {
     // Clean-up the CQ Results Cache.
     synchronized (LOCK) {