You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ag...@apache.org on 2016/06/08 20:27:37 UTC

incubator-geode git commit: GEODE-1495: Changes are made to remove the cached destroyed token/events from the CQ.

Repository: incubator-geode
Updated Branches:
  refs/heads/develop afa7cc815 -> e4994c7b3


GEODE-1495: Changes are made to remove the cached destroyed token/events from the CQ.

The CQEvents as seen by CQs are cached in order to avoid applying CQ queries on old values.

In case of a destory CQEvent, the CQEvents are marked with destroy tokens and removed from
the cache after the CQEvent is added to HAQueue.
This works fine for the CQs registered locally, but for the CQs registered on peer server, the
CQs weren't removed from the cache, which resulted in generating wrong CQEvent for subsequent
operation.
This change removes the destroy CQevent from the cache after the CQEvent is distributed to
peer server.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/e4994c7b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/e4994c7b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/e4994c7b

Branch: refs/heads/develop
Commit: e4994c7b3fd42b6804b909796b8589729e9861ea
Parents: afa7cc8
Author: Anil <ag...@pivotal.io>
Authored: Tue Jun 7 18:31:17 2016 -0700
Committer: Anil <ag...@pivotal.io>
Committed: Wed Jun 8 12:23:07 2016 -0700

----------------------------------------------------------------------
 .../cache/DistributedCacheOperation.java        | 52 +++++++++++++
 .../query/cq/dunit/PrCqUsingPoolDUnitTest.java  | 78 +++++++++++++++++++-
 2 files changed, 129 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e4994c7b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java
index 83d4c5a..f51717d 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java
@@ -43,6 +43,7 @@ import com.gemstone.gemfire.cache.EntryNotFoundException;
 import com.gemstone.gemfire.cache.Operation;
 import com.gemstone.gemfire.cache.RegionDestroyedException;
 import com.gemstone.gemfire.cache.query.internal.cq.CqService;
+import com.gemstone.gemfire.cache.query.internal.cq.ServerCQ;
 import com.gemstone.gemfire.distributed.internal.DM;
 import com.gemstone.gemfire.distributed.internal.DirectReplyProcessor;
 import com.gemstone.gemfire.distributed.internal.DistributionManager;
@@ -59,12 +60,14 @@ import com.gemstone.gemfire.internal.Assert;
 import com.gemstone.gemfire.internal.CopyOnWriteHashSet;
 import com.gemstone.gemfire.internal.InternalDataSerializer;
 import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.CacheDistributionAdvisor.CacheProfile;
 import com.gemstone.gemfire.internal.cache.DistributedPutAllOperation.PutAllMessage;
 import com.gemstone.gemfire.internal.cache.EntryEventImpl.OldValueImporter;
 import com.gemstone.gemfire.internal.cache.FilterRoutingInfo.FilterInfo;
 import com.gemstone.gemfire.internal.cache.UpdateOperation.UpdateMessage;
 import com.gemstone.gemfire.internal.cache.partitioned.PartitionMessage;
 import com.gemstone.gemfire.internal.cache.persistence.PersistentMemberID;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
 import com.gemstone.gemfire.internal.cache.versions.DiskVersionTag;
 import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
 import com.gemstone.gemfire.internal.cache.versions.VersionSource;
@@ -632,6 +635,10 @@ public abstract class DistributedCacheOperation {
         }
       }
 
+      if (region.isUsedForPartitionedRegionBucket() && filterRouting != null) {
+        removeDestroyTokensFromCqResultKeys(filterRouting);
+      }
+
     } catch (CancelException e) {
       if (logger.isDebugEnabled()) {
         logger.debug("distribution of message aborted by shutdown: {}", this);
@@ -651,6 +658,51 @@ public abstract class DistributedCacheOperation {
     }
   }
 
+
+  /**
+   * Cleanup destroyed events in CQ result cache for remote CQs.
+   * While maintaining the CQ results key caching. the destroy event
+   * keys are marked as destroyed instead of removing them, this is
+   * to take care, arrival of duplicate events. The key marked as
+   * destroyed are  removed after the event is placed in clients 
+   * HAQueue or distributed to the peers.
+   *
+   * This is similar to CacheClientNotifier.removeDestroyTokensFromCqResultKeys()
+   * where the destroyed events for local CQs are handled.
+   */
+  private void removeDestroyTokensFromCqResultKeys(FilterRoutingInfo filterRouting) {
+    for (InternalDistributedMember m : filterRouting.getMembers()) {
+      FilterInfo filterInfo = filterRouting.getFilterInfo(m);
+      if (filterInfo.getCQs() == null) {
+        continue;
+      }
+
+      CacheProfile cf = (CacheProfile) ((BucketRegion)getRegion()).getPartitionedRegion()
+          .getCacheDistributionAdvisor().getProfile(m);
+
+      if (cf == null || cf.filterProfile == null || cf.filterProfile.isLocalProfile() 
+          || cf.filterProfile.getCqMap().isEmpty()) {
+        continue;
+      }
+
+
+      for (Object value : cf.filterProfile.getCqMap().values()) {
+        ServerCQ cq = (ServerCQ)value;
+
+        for (Map.Entry<Long, Integer> e: filterInfo.getCQs().entrySet()) {
+          Long cqID = e.getKey();
+          // For the CQs satisfying the event with destroy CQEvent, remove
+          // the entry form CQ cache.
+          if (cq.getFilterID() == cqID && (e.getValue().equals(Integer.valueOf(
+              MessageType.LOCAL_DESTROY)))) {
+            cq.removeFromCqResultKeys(((EntryEventImpl)event).getKey(), true);
+          }
+        }
+      }
+    }
+  }
+
+
   /**
    * Get the adjunct receivers for a partitioned region operation
    * 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e4994c7b/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/PrCqUsingPoolDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/PrCqUsingPoolDUnitTest.java b/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/PrCqUsingPoolDUnitTest.java
index ffdd734..0af556f 100644
--- a/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/PrCqUsingPoolDUnitTest.java
+++ b/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/PrCqUsingPoolDUnitTest.java
@@ -317,7 +317,70 @@ public class PrCqUsingPoolDUnitTest extends CacheTestCase {
     cqHelper.closeServer(server2);
     cqHelper.closeServer(server1);
   }
-  
+
+  /**
+   * Test for registering cqs on a bridge server with local max memory zero.
+   */
+  public void testCqOnAccessorServerWithUpdatesResultingInDestroyedCQEvents() throws Exception {
+
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM server2 = host.getVM(1);
+    VM client = host.getVM(2);
+
+    // creating an accessor vm with Bridge Server installed.
+    createServer(server1,true);
+
+    createServer(server2);
+
+    // create client
+    final int port = server1.invoke(() -> PrCqUsingPoolDUnitTest.getCacheServerPort());
+    final String host0 = NetworkUtils.getServerHostName(server1.getHost());
+
+    String poolName = "testPartitionedCqOnAccessorBridgeServer";
+    createPool(client, poolName, host0, port);
+
+    // register cq.
+    createCQ(client, poolName, "testCQEvents_0", cqs[0]);
+    cqHelper.executeCQ(client, "testCQEvents_0", false, null);
+
+    // create values
+    final int size = 10;
+    createValues(server1, regions[0], size);
+
+    // do updates that results in destroy CQEvents.
+    updateValuesToGenerateDestroyCQEvent(server1, regions[0], size);
+
+    // validate cq for expected Destroy CQ events.
+    cqHelper.validateCQ(client, "testCQEvents_0",
+        /* resultSize: */ CqQueryUsingPoolDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ size,
+        /* deletes; */ 0,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ 0,
+        /* queryDeletes: */ size,
+        /* totalEvents: */ (size+size));
+
+    // Create values to generate Create CQ Events.
+    createValues(server1, regions[0], size);
+
+    cqHelper.validateCQ(client, "testCQEvents_0",
+        /* resultSize: */ CqQueryUsingPoolDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ size * 2,
+        /* deletes; */ 0,
+        /* queryInserts: */ size * 2,
+        /* queryUpdates: */ 0,
+        /* queryDeletes: */ size,
+        /* totalEvents: */ CqQueryUsingPoolDUnitTest.noTest);
+
+    cqHelper.closeClient(client);
+    cqHelper.closeServer(server2);
+    cqHelper.closeServer(server1);
+  }
+
+
   /**
    * test for registering cqs on single Bridge server hosting all the data. This
    * will generate all the events locally and should always have the old value 
@@ -1994,7 +2057,20 @@ public class PrCqUsingPoolDUnitTest extends CacheTestCase {
       }
     });
   }
+
  
+  public void updateValuesToGenerateDestroyCQEvent(VM vm, final String regionName, final int size) {
+    vm.invoke(new CacheSerializableRunnable("Update values for region : "+regionName) {
+      public void run2() throws CacheException {
+        Region region1 = getRootRegion().getSubregion(regionName);
+        for (int i = 1; i <= size; i++) {
+          region1.put(KEY+i, new Portfolio(i * -1));
+        }
+        LogWriterUtils.getLogWriter().info("### Number of Entries in Region :" + region1.keys().size());
+      }
+    });
+  }
+
   public void createValuesPutall(VM vm, final String regionName, final int size) {
     vm.invoke(new CacheSerializableRunnable("Create values for region : "+regionName) {
       public void run2() throws CacheException {